Pipeline Manager API Reference¶
The Heritage Data Processor Pipeline Manager API provides comprehensive endpoints for creating, configuring, and executing multi-step processing pipelines with Zenodo integration, metadata management, and file synchronization.
Base URL¶
All endpoints are prefixed with /pipelines or use the Blueprint mounting point.
Pipeline CRUD Operations¶
List Pipelines¶
Retrieves all configured pipelines with optional filtering by status.
Endpoint: GET /pipelines
Query Parameters:
status(string, optional): Filter pipelines by their execution status (e.g.,active,inactive,draft)
Response:
[
{
"identifier": "tei_processing_pipeline",
"name": "TEI Processing Pipeline",
"description": "Validates and transforms TEI XML files",
"status": "active",
"zenodoDraftStepEnabled": true,
"zenodoUploadStepEnabled": true,
"zenodoPublishStepEnabled": false,
"steps": [
{
"step_number": 1,
"step_name": "validate_tei",
"component_name": "tei_validator",
"parameters": {"schema": "tei_all"},
"inputMapping": {},
"outputs": []
}
]
}
]
Response Fields:
identifier(string): Unique identifier for the pipelinename(string): Human-readable pipeline namedescription(string): Detailed description of the pipeline purposestatus(string): Current pipeline statuszenodoDraftStepEnabled(boolean): Whether draft creation is enabledzenodoUploadStepEnabled(boolean): Whether file upload to Zenodo is enabledzenodoPublishStepEnabled(boolean): Whether automatic publication is enabledsteps(array): Ordered list of pipeline processing steps
Status Codes:
200 OK: Pipelines retrieved successfully500 Internal Server Error: Failed to retrieve pipelines
Create Pipeline¶
Creates a new pipeline from a JSON definition.
Endpoint: POST /pipelines
Request Body:
{
"identifier": "new_pipeline",
"name": "New Processing Pipeline",
"description": "Pipeline for processing manuscript data",
"status": "draft",
"zenodoDraftStepEnabled": true,
"zenodoUploadStepEnabled": true,
"zenodoPublishStepEnabled": false,
"steps": [
{
"step_number": 1,
"step_name": "validation",
"component_name": "xml_validator",
"parameters": {
"schema": "custom_schema",
"strict_mode": true
},
"inputMapping": {
"input_file": {
"sourceType": "pipelineFile",
"fileId": "source_file"
}
},
"outputs": [
{
"id": "validation_report",
"pattern": "{original_stem}_validation.json",
"outputMapping": {
"mapToZenodo": true,
"zenodoMappings": [
{
"zenodoField": "description",
"jsonKey": "validation.summary"
}
]
}
}
],
"timeout_seconds": 300
}
]
}
Request Parameters:
identifier(string, required): Unique identifier for the pipelinename(string, required): Pipeline namedescription(string, optional): Pipeline descriptionstatus(string, optional): Initial statuszenodoDraftStepEnabled(boolean, optional): Enable draft creation stepzenodoUploadStepEnabled(boolean, optional): Enable file upload stepzenodoPublishStepEnabled(boolean, optional): Enable automatic publicationsteps(array, required): Array of pipeline step definitions
Response:
Status Codes:
201 Created: Pipeline created successfully400 Bad Request: No data provided500 Internal Server Error: Pipeline creation failed
Get Pipeline¶
Retrieves a specific pipeline by its identifier.
Endpoint: GET /pipelines/<identifier>
URL Parameters:
identifier(string, required): Unique pipeline identifier
Response:
{
"identifier": "tei_processing_pipeline",
"name": "TEI Processing Pipeline",
"description": "Validates and transforms TEI XML files",
"status": "active",
"zenodoDraftStepEnabled": true,
"zenodoUploadStepEnabled": true,
"zenodoPublishStepEnabled": false,
"steps": [...]
}
Status Codes:
200 OK: Pipeline retrieved successfully404 Not Found: Pipeline with specified identifier does not exist500 Internal Server Error: Failed to retrieve pipeline
Update Pipeline¶
Updates a complete pipeline definition, replacing all existing configuration.
Endpoint: PUT /pipelines/<identifier>
URL Parameters:
identifier(string, required): Unique pipeline identifier
Request Body:
Complete pipeline definition JSON (same structure as create pipeline)
Validation:
The endpoint validates that if the request body contains an identifier field, it must match the URL parameter.
Response:
Status Codes:
200 OK: Pipeline updated successfully400 Bad Request: No data provided or identifier mismatch404 Not Found: Pipeline not found or update failed500 Internal Server Error: Update operation failed
Delete Pipeline¶
Permanently deletes a pipeline and all its configuration.
Endpoint: DELETE /pipelines/<identifier>
URL Parameters:
identifier(string, required): Unique pipeline identifier
Response:
Status Codes:
200 OK: Pipeline deleted successfully404 Not Found: Pipeline with specified identifier does not exist500 Internal Server Error: Deletion failed
Permanent Deletion
This operation cannot be undone. All pipeline configuration and metadata mappings will be permanently removed.
Pipeline Import/Export¶
Import Pipeline from YAML¶
Imports a pipeline definition from an uploaded YAML file.
Endpoint: POST /pipelines/import
Request Format:
Multipart form data with file upload
Form Fields:
file(file, required): YAML file containing pipeline definition
Response:
Status Codes:
200 OK: Pipeline imported successfully400 Bad Request: No file provided or empty filename500 Internal Server Error: Import failed due to invalid YAML or database error
Example cURL:
Export Pipeline to YAML¶
Exports a pipeline definition as a downloadable YAML file.
Endpoint: GET /pipelines/<identifier>/export
URL Parameters:
identifier(string, required): Unique pipeline identifier
Response:
YAML file download with proper headers:
Content-Type:application/x-yamlContent-Disposition:attachment; filename={identifier}.yaml
Example Response Body:
identifier: tei_processing_pipeline
name: TEI Processing Pipeline
description: Validates and transforms TEI XML files
status: active
zenodoDraftStepEnabled: true
zenodoUploadStepEnabled: true
zenodoPublishStepEnabled: false
steps:
- step_number: 1
step_name: validate_tei
component_name: tei_validator
parameters:
schema: tei_all
Status Codes:
200 OK: Pipeline exported successfully404 Not Found: Pipeline not found500 Internal Server Error: Export failed
Pipeline Execution¶
Execute Pipeline on Local Files¶
CLI-specific entry point that creates database records from local file paths and executes the pipeline.
Endpoint: POST /pipelines/<identifier>/execute_on_local_files
URL Parameters:
identifier(string, required): Pipeline identifier to execute
Decorator: @project_required - Requires an active HDPC project
Request Body:
{
"local_file_paths": [
"/path/to/manuscript_001.xml",
"/path/to/manuscript_002.xml"
],
"is_sandbox": true
}
Request Parameters:
local_file_paths(array, required): List of absolute file paths to processis_sandbox(boolean, optional): Whether to use Zenodo sandbox environment. Defaults totrue
Response:
Execution Flow:
The endpoint performs the following operations:
- Validates an HDPC project is loaded
- Retrieves project ID from database
- For each local file path:
- Checks if file already exists in database
- If not, inserts new record into
source_filestable with metadata - Calculates SHA256 hash and determines MIME type
- Prepares metadata for each file via internal call to
prepare_metadata_for_file_route - Retrieves generated record IDs from
zenodo_recordstable - Delegates to main pipeline execution endpoint
Status Codes:
200 OK: Pipeline executed successfully for all files207 Multi-Status: Pipeline completed with some errors400 Bad Request: No project loaded or no local file paths provided500 Internal Server Error: Failed to prepare records or execute pipeline
Execute Pipeline¶
Executes a multi-step pipeline synchronously, supporting both initial record creation and new version creation modes.
Endpoint: POST /pipelines/<identifier>/execute
URL Parameters:
identifier(string, required): Pipeline identifier to execute
Request Body (Standard Mode):
Request Body (Versioning Mode):
{
"concept_rec_id": "7891234",
"file_manifest": {
"files_to_keep": ["existing_file.xml"],
"new_source_file_path": "/path/to/new_version.xml"
},
"is_sandbox": false
}
Request Parameters:
record_ids(array, optional): List of local record database IDs to process in standard modeconcept_rec_id(string, optional): Zenodo concept record ID for creating new versionsfile_manifest(object, optional): File synchronization configuration for versioning modefiles_to_keep(array): Filenames to retain from previous versionnew_source_file_path(string): Path to new source file for this versionis_sandbox(boolean, optional): Whether to use Zenodo sandbox. Defaults totrue
Response (Success):
Response (Partial Failure):
{
"success": false,
"message": "Pipeline finished with errors.",
"error": "Item 102: Component 'xml_validator' execution failed.\nItem 103: Metadata re-preparation failed after pipeline run."
}
Status Codes:
200 OK: Pipeline executed successfully for all items207 Multi-Status: Pipeline completed but some items failed400 Bad Request: No record IDs or concept record ID provided404 Not Found: Pipeline not found500 Internal Server Error: Unexpected error during execution
Pipeline Execution Flow¶
The pipeline executor follows this comprehensive workflow:
Step 1: Draft Creation¶
Versioning Mode: Creates a new version draft via create_new_version_draft, generates new local record ID, and optionally syncs file manifest.
Standard Mode: Optionally creates Zenodo draft via create_api_draft_for_prepared_record if zenodoDraftStepEnabled is true. Backs up existing metadata before execution.
Step 2: Output Directory Setup¶
Creates unique output directory based on Zenodo concept and record IDs, or uses timestamped fallback directory. Maps source file to execution file map for input resolution.
Step 3: Component Execution¶
For each pipeline step:
- Resolves step inputs from execution file map
- Constructs component run payload with inputs, parameters, and output directory
- Calls
run_componentinternally via Flask test request context - Polls execution status endpoint synchronously with timeout
- Retrieves output file paths on completion
- Adds output files to project database via
add_source_files_route - Maps output file IDs to paths in execution file map
Step 4: Metadata Override Collection¶
Collects metadata overrides from pipeline output files using the execution file map. Processes outputMapping configurations to extract Zenodo field mappings from JSON output files.
Step 5: Metadata Update¶
With Draft: Updates Zenodo draft metadata via API call using _update_draft_metadata.
Without Draft: Re-prepares metadata with overrides via prepare_metadata_for_file_route.
Step 6: File Upload¶
If zenodoUploadStepEnabled is true, uploads all associated files to Zenodo deposition via upload_files_for_deposition_route.
Step 7: Publication¶
If zenodoPublishStepEnabled is true, publishes the Zenodo record via publish_record.
Execution Status¶
Get Execution Status¶
Retrieves the current status of a specific pipeline execution.
Endpoint: GET /executions/<execution_uuid>/status
URL Parameters:
execution_uuid(string, required): UUID of the pipeline execution
Response:
{
"execution_uuid": "550e8400-e29b-41d4-a716-446655440000",
"pipeline_identifier": "tei_processing_pipeline",
"status": "running",
"started_at": "2025-10-21T13:30:00Z",
"completed_at": null,
"current_step": 2,
"total_steps": 5
}
Response Fields:
execution_uuid(string): Unique execution identifierpipeline_identifier(string): Identifier of the executed pipelinestatus(string): Current execution status (e.g.,running,completed,failed)started_at(string): ISO 8601 timestamp of execution startcompleted_at(string, nullable): ISO 8601 timestamp of completion, or null if still runningcurrent_step(integer): Current step number being executedtotal_steps(integer): Total number of steps in the pipeline
Status Codes:
200 OK: Status retrieved successfully404 Not Found: Execution with specified UUID does not exist500 Internal Server Error: Failed to retrieve status
Metadata Management¶
Update Metadata Mapping¶
Updates the Zenodo metadata mapping configuration for a pipeline.
Endpoint: PUT /pipelines/<identifier>/metadata_mapping
URL Parameters:
identifier(string, required): Pipeline identifier
Request Body:
{
"description_template": "This is a ${zenodo_metadata.metadata.title} with validation results from ${validation_report.summary}",
"zenodoMappings": [
{
"zenodoField": "keywords",
"jsonKey": "metadata.keywords"
},
{
"zenodoField": "notes",
"jsonKey": "validation.notes"
}
]
}
Request Parameters:
- Accepts any valid dictionary structure for metadata mapping configuration
Response:
Status Codes:
200 OK: Metadata mapping updated successfully400 Bad Request: Invalid mapping data format (not a dictionary)404 Not Found: Pipeline not found or update failed500 Internal Server Error: Update operation failed
Helper Functions¶
Description Construction¶
Function: _construct_description(template: str, execution_file_map: dict, record_id: int, current_metadata: dict = None) -> str
Constructs Zenodo record descriptions by replacing placeholders with values from output files and Zenodo metadata.
Placeholder Syntax:
${file_id.json_key}: Extracts value from JSON output file${file_id}: Reads entire file content${zenodo_metadata.key_path}: Extracts value from Zenodo record metadata
Example Template:
This record contains ${zenodo_metadata.metadata.title}.
Validation results: ${validation_report.summary}.
Full report: ${validation_report}
Processing:
Uses regex to find all placeholders matching ${...} pattern. For file-based placeholders, reads files from execution file map. For JSON files, extracts nested values using dot notation. For Zenodo metadata, lazy-loads from database if not provided.
Draft Metadata Update¶
Function: _update_draft_metadata(record_id: int, overrides: dict) -> bool
Updates Zenodo draft metadata via API and synchronizes local database.
Process:
- Retrieves current record metadata and sandbox status from database
- Extracts Zenodo API self-link from metadata
- Merges overrides into metadata object
- Sends PUT request to Zenodo draft URL with updated metadata
- Updates local database with confirmed response from Zenodo
- Synchronizes title and version fields
Returns: True on success, False on failure
File Cleanup¶
Function: _cleanup_old_derived_files(record_id: int)
Deletes all derived files associated with a record before pipeline execution to ensure a clean slate.
Process:
- Retrieves source file ID for the record
- Finds all derived files in
record_files_map(excluding source file) - Deletes entries from
record_files_maptable - Deletes entries from
source_filestable - Commits transaction
Use Case: Prevents accumulation of outdated output files from previous pipeline runs.
Metadata Backup¶
Function: _backup_record_metadata(record_id: int)
Creates a backup of record metadata before pipeline execution.
Process:
- Retrieves current metadata JSON from
zenodo_recordstable - Deletes any existing backups for this record
- Inserts new backup into
metadata_backupstable with timestamp - Stores empty JSON object as placeholder for mapping config
Non-Blocking: Errors during backup are logged but do not prevent pipeline execution.
Override Collection¶
Function: _collect_overrides_from_pipeline_outputs(pipeline: dict, execution_file_map: dict) -> dict
Reads pipeline output files to extract metadata values for Zenodo fields.
Process:
- Iterates through all pipeline steps and their outputs
- Filters outputs where
mapToZenodois true - Retrieves actual file paths from execution file map
- Reads and parses JSON output files
- Extracts values using configured JSON key paths
- Maps values to Zenodo metadata fields
Returns: Dictionary mapping Zenodo field names to override values
Nested Value Extraction¶
Function: _get_nested_value(data: dict, key_path: str)
Safely retrieves values from nested dictionaries using dot notation.
Example:
data = {"metadata": {"creators": [{"name": "Smith"}]}}
value = _get_nested_value(data, "metadata.creators.0.name")
# Returns: "Smith"
Returns: Value if found, None if key path does not exist
Input Resolution¶
Function: resolve_step_inputs(step_input_mapping: dict, available_files: dict) -> dict
Resolves actual file paths for a pipeline step's inputs based on its mapping configuration.
Parameters:
step_input_mapping: Input mapping from step definitionavailable_files: Dictionary tracking available files (e.g.,{'source_file': '/path/to/file.xml', 'output_1': '/path/to/result.json'})
Supported Source Types:
pipelineFile: Resolves file from execution file map usingfileId
Returns: Dictionary mapping component input names to resolved file paths
Exception: Raises Exception if a required pipeline file cannot be resolved
File Manifest Synchronization¶
Function: _sync_file_manifest(new_draft_data: dict, file_manifest: dict, is_sandbox: bool, new_local_record_id: int)
Synchronizes files in a new Zenodo version draft based on user-provided manifest.
Process:
- Validates new source file path exists
- Retrieves bucket URL from Zenodo draft metadata
- Deletes unchecked files from draft via DELETE requests
- Uploads new source file to bucket via PUT request
- Inserts new source file into
source_filestable - Updates
zenodo_recordsto point to new source file - Creates record-file mapping with
uploadedstatus
Timeout: File upload has 300-second timeout
Database Integration¶
Pipeline Database Manager¶
The API uses PipelineDatabaseManager for persistent storage of pipeline definitions:
Database Path: databases/pipeline_system.db
Auto-Creation: Database directory is created automatically if it does not exist
Methods Used:
list_pipelines(status): Lists pipelines with optional filteringget_pipeline(identifier): Retrieves full pipeline datacreate_pipeline(data): Creates new pipelineupdate_pipeline_complete(identifier, data): Updates entire pipelinedelete_pipeline(identifier): Deletes pipelineimport_from_yaml(yaml_content): Imports pipeline from YAMLexport_to_yaml(identifier): Exports pipeline to YAMLget_execution_status(execution_uuid): Retrieves execution statusupdate_pipeline_metadata_mapping(identifier, mapping_data): Updates metadata mapping
Project Database Access¶
Pipeline execution uses direct SQLite connections to the project database:
Connection Pattern: Context manager with sqlite3.connect(project_manager.db_path)
Row Factory: Set to sqlite3.Row for dictionary-like access
Tables Accessed:
source_files: Stores file metadata and pathszenodo_records: Stores Zenodo record metadatarecord_files_map: Maps files to recordsmetadata_backups: Stores metadata backupsproject_info: Stores project configuration
Error Handling¶
Exception Logging¶
All endpoints use comprehensive error logging with stack traces:
Error Response Format¶
Common Error Scenarios¶
Pipeline Not Found:
Response: 404 Not Found with message "Pipeline not found"
No Data Provided:
Response: 400 Bad Request with message "No data provided"
Identifier Mismatch:
Response: 400 Bad Request with message "Identifier in URL and payload do not match"
Component Execution Timeout:
If a component exceeds its configured timeout, the pipeline execution fails with message: "Component '{component_name}' timed out after {timeout} seconds."
Internal API Calls¶
Test Request Context¶
The pipeline executor uses Flask's test_request_context for internal API calls:
with current_app.test_request_context(json=payload):
response_tuple = run_component(component_name)
result = response_tuple[0].get_json()
Called Endpoints:
run_component(component_name): Starts component executionadd_source_files_route(): Adds derived files to projectcreate_api_draft_for_prepared_record(): Creates Zenodo draftsprepare_metadata_for_file_route(): Prepares or re-prepares metadataupload_files_for_deposition_route(): Uploads files to Zenodopublish_record(): Publishes Zenodo records
External HTTP Requests¶
For component status polling, the executor makes HTTP requests to localhost:
server_port = current_app.config.get("SERVER_PORT", 5001)
status_url = f"http://localhost:{server_port}/api/components/executions/{execution_id}/status"
Polling Interval: 2 seconds between status checks
Timeout: Configurable per step via timeout_seconds (default 300)
Usage Examples¶
Example 1: Create and Execute Simple Pipeline¶
POST /pipelines
Content-Type: application/json
{
"identifier": "simple_validation",
"name": "Simple Validation Pipeline",
"zenodoDraftStepEnabled": false,
"zenodoUploadStepEnabled": false,
"zenodoPublishStepEnabled": false,
"steps": [
{
"step_number": 1,
"step_name": "validate",
"component_name": "xml_validator",
"parameters": {"schema": "tei_all"},
"inputMapping": {
"input_file": {
"sourceType": "pipelineFile",
"fileId": "source_file"
}
},
"outputs": [
{
"id": "validation_result",
"pattern": "{original_stem}_validation.json"
}
]
}
]
}
Execute:
POST /pipelines/simple_validation/execute
Content-Type: application/json
{
"record_ids": [1, 2, 3],
"is_sandbox": true
}
Example 2: Multi-Step Pipeline with Metadata Override¶
POST /pipelines
Content-Type: application/json
{
"identifier": "full_processing",
"name": "Full Processing Pipeline",
"zenodoDraftStepEnabled": true,
"zenodoUploadStepEnabled": true,
"zenodoPublishStepEnabled": false,
"steps": [
{
"step_number": 1,
"step_name": "extract_metadata",
"component_name": "metadata_extractor",
"parameters": {},
"inputMapping": {
"input": {"sourceType": "pipelineFile", "fileId": "source_file"}
},
"outputs": [
{
"id": "extracted_metadata",
"pattern": "{original_stem}_metadata.json",
"outputMapping": {
"mapToZenodo": true,
"zenodoMappings": [
{
"zenodoField": "description",
"jsonKey": "description"
},
{
"zenodoField": "keywords",
"jsonKey": "keywords"
}
]
}
}
]
},
{
"step_number": 2,
"step_name": "generate_preview",
"component_name": "preview_generator",
"parameters": {"format": "html"},
"inputMapping": {
"input": {"sourceType": "pipelineFile", "fileId": "source_file"},
"metadata": {"sourceType": "pipelineFile", "fileId": "extracted_metadata"}
},
"outputs": [
{
"id": "preview",
"pattern": "{original_stem}_preview.html"
}
]
}
]
}
Example 3: Create New Version with File Manifest¶
POST /pipelines/full_processing/execute
Content-Type: application/json
{
"concept_rec_id": "7891234",
"file_manifest": {
"files_to_keep": ["documentation.pdf", "supplementary_data.csv"],
"new_source_file_path": "/data/manuscript_v2.xml"
},
"is_sandbox": false
}
Example 4: Import Pipeline from YAML¶
Response:
Configuration¶
Global Settings¶
Pipeline Database Path: Configured at module level as PIPELINE_DB_PATH
Server Port: Retrieved from Flask config as SERVER_PORT (default 5001)
Timeouts¶
Component Execution: Configurable per step via timeout_seconds parameter (default 300 seconds)
API Requests: Zenodo API calls use 60-second timeout
File Uploads: Zenodo file uploads use 300-second timeout
Status Polling: 2-second interval between polls