Skip to content

Pipeline Manager API Reference

The Heritage Data Processor Pipeline Manager API provides comprehensive endpoints for creating, configuring, and executing multi-step processing pipelines with Zenodo integration, metadata management, and file synchronization.

Base URL

All endpoints are prefixed with /pipelines or use the Blueprint mounting point.


Pipeline CRUD Operations

List Pipelines

Retrieves all configured pipelines with optional filtering by status.

Endpoint: GET /pipelines

Query Parameters:

  • status (string, optional): Filter pipelines by their execution status (e.g., active, inactive, draft)

Response:

[
  {
    "identifier": "tei_processing_pipeline",
    "name": "TEI Processing Pipeline",
    "description": "Validates and transforms TEI XML files",
    "status": "active",
    "zenodoDraftStepEnabled": true,
    "zenodoUploadStepEnabled": true,
    "zenodoPublishStepEnabled": false,
    "steps": [
      {
        "step_number": 1,
        "step_name": "validate_tei",
        "component_name": "tei_validator",
        "parameters": {"schema": "tei_all"},
        "inputMapping": {},
        "outputs": []
      }
    ]
  }
]

Response Fields:

  • identifier (string): Unique identifier for the pipeline
  • name (string): Human-readable pipeline name
  • description (string): Detailed description of the pipeline purpose
  • status (string): Current pipeline status
  • zenodoDraftStepEnabled (boolean): Whether draft creation is enabled
  • zenodoUploadStepEnabled (boolean): Whether file upload to Zenodo is enabled
  • zenodoPublishStepEnabled (boolean): Whether automatic publication is enabled
  • steps (array): Ordered list of pipeline processing steps

Status Codes:

  • 200 OK: Pipelines retrieved successfully
  • 500 Internal Server Error: Failed to retrieve pipelines

Create Pipeline

Creates a new pipeline from a JSON definition.

Endpoint: POST /pipelines

Request Body:

{
  "identifier": "new_pipeline",
  "name": "New Processing Pipeline",
  "description": "Pipeline for processing manuscript data",
  "status": "draft",
  "zenodoDraftStepEnabled": true,
  "zenodoUploadStepEnabled": true,
  "zenodoPublishStepEnabled": false,
  "steps": [
    {
      "step_number": 1,
      "step_name": "validation",
      "component_name": "xml_validator",
      "parameters": {
        "schema": "custom_schema",
        "strict_mode": true
      },
      "inputMapping": {
        "input_file": {
          "sourceType": "pipelineFile",
          "fileId": "source_file"
        }
      },
      "outputs": [
        {
          "id": "validation_report",
          "pattern": "{original_stem}_validation.json",
          "outputMapping": {
            "mapToZenodo": true,
            "zenodoMappings": [
              {
                "zenodoField": "description",
                "jsonKey": "validation.summary"
              }
            ]
          }
        }
      ],
      "timeout_seconds": 300
    }
  ]
}

Request Parameters:

  • identifier (string, required): Unique identifier for the pipeline
  • name (string, required): Pipeline name
  • description (string, optional): Pipeline description
  • status (string, optional): Initial status
  • zenodoDraftStepEnabled (boolean, optional): Enable draft creation step
  • zenodoUploadStepEnabled (boolean, optional): Enable file upload step
  • zenodoPublishStepEnabled (boolean, optional): Enable automatic publication
  • steps (array, required): Array of pipeline step definitions

Response:

{
  "success": true,
  "pipeline_id": "new_pipeline"
}

Status Codes:

  • 201 Created: Pipeline created successfully
  • 400 Bad Request: No data provided
  • 500 Internal Server Error: Pipeline creation failed

Get Pipeline

Retrieves a specific pipeline by its identifier.

Endpoint: GET /pipelines/<identifier>

URL Parameters:

  • identifier (string, required): Unique pipeline identifier

Response:

{
  "identifier": "tei_processing_pipeline",
  "name": "TEI Processing Pipeline",
  "description": "Validates and transforms TEI XML files",
  "status": "active",
  "zenodoDraftStepEnabled": true,
  "zenodoUploadStepEnabled": true,
  "zenodoPublishStepEnabled": false,
  "steps": [...]
}

Status Codes:

  • 200 OK: Pipeline retrieved successfully
  • 404 Not Found: Pipeline with specified identifier does not exist
  • 500 Internal Server Error: Failed to retrieve pipeline

Update Pipeline

Updates a complete pipeline definition, replacing all existing configuration.

Endpoint: PUT /pipelines/<identifier>

URL Parameters:

  • identifier (string, required): Unique pipeline identifier

Request Body:

Complete pipeline definition JSON (same structure as create pipeline)

Validation:

The endpoint validates that if the request body contains an identifier field, it must match the URL parameter.

Response:

{
  "success": true,
  "message": "Pipeline updated successfully"
}

Status Codes:

  • 200 OK: Pipeline updated successfully
  • 400 Bad Request: No data provided or identifier mismatch
  • 404 Not Found: Pipeline not found or update failed
  • 500 Internal Server Error: Update operation failed

Delete Pipeline

Permanently deletes a pipeline and all its configuration.

Endpoint: DELETE /pipelines/<identifier>

URL Parameters:

  • identifier (string, required): Unique pipeline identifier

Response:

{
  "success": true,
  "message": "Pipeline deleted successfully"
}

Status Codes:

  • 200 OK: Pipeline deleted successfully
  • 404 Not Found: Pipeline with specified identifier does not exist
  • 500 Internal Server Error: Deletion failed

Permanent Deletion

This operation cannot be undone. All pipeline configuration and metadata mappings will be permanently removed.


Pipeline Import/Export

Import Pipeline from YAML

Imports a pipeline definition from an uploaded YAML file.

Endpoint: POST /pipelines/import

Request Format:

Multipart form data with file upload

Form Fields:

  • file (file, required): YAML file containing pipeline definition

Response:

{
  "success": true,
  "pipeline_id": "imported_pipeline_name"
}

Status Codes:

  • 200 OK: Pipeline imported successfully
  • 400 Bad Request: No file provided or empty filename
  • 500 Internal Server Error: Import failed due to invalid YAML or database error

Example cURL:

curl -X POST http://localhost:5001/api/pipelines/import \
  -F "file=@pipeline_definition.yaml"

Export Pipeline to YAML

Exports a pipeline definition as a downloadable YAML file.

Endpoint: GET /pipelines/<identifier>/export

URL Parameters:

  • identifier (string, required): Unique pipeline identifier

Response:

YAML file download with proper headers:

  • Content-Type: application/x-yaml
  • Content-Disposition: attachment; filename={identifier}.yaml

Example Response Body:

identifier: tei_processing_pipeline
name: TEI Processing Pipeline
description: Validates and transforms TEI XML files
status: active
zenodoDraftStepEnabled: true
zenodoUploadStepEnabled: true
zenodoPublishStepEnabled: false
steps:
  - step_number: 1
    step_name: validate_tei
    component_name: tei_validator
    parameters:
      schema: tei_all

Status Codes:

  • 200 OK: Pipeline exported successfully
  • 404 Not Found: Pipeline not found
  • 500 Internal Server Error: Export failed

Pipeline Execution

Execute Pipeline on Local Files

CLI-specific entry point that creates database records from local file paths and executes the pipeline.

Endpoint: POST /pipelines/<identifier>/execute_on_local_files

URL Parameters:

  • identifier (string, required): Pipeline identifier to execute

Decorator: @project_required - Requires an active HDPC project

Request Body:

{
  "local_file_paths": [
    "/path/to/manuscript_001.xml",
    "/path/to/manuscript_002.xml"
  ],
  "is_sandbox": true
}

Request Parameters:

  • local_file_paths (array, required): List of absolute file paths to process
  • is_sandbox (boolean, optional): Whether to use Zenodo sandbox environment. Defaults to true

Response:

{
  "success": true,
  "message": "Successfully executed pipeline for 2 items."
}

Execution Flow:

The endpoint performs the following operations:

  1. Validates an HDPC project is loaded
  2. Retrieves project ID from database
  3. For each local file path:
  4. Checks if file already exists in database
  5. If not, inserts new record into source_files table with metadata
  6. Calculates SHA256 hash and determines MIME type
  7. Prepares metadata for each file via internal call to prepare_metadata_for_file_route
  8. Retrieves generated record IDs from zenodo_records table
  9. Delegates to main pipeline execution endpoint

Status Codes:

  • 200 OK: Pipeline executed successfully for all files
  • 207 Multi-Status: Pipeline completed with some errors
  • 400 Bad Request: No project loaded or no local file paths provided
  • 500 Internal Server Error: Failed to prepare records or execute pipeline

Execute Pipeline

Executes a multi-step pipeline synchronously, supporting both initial record creation and new version creation modes.

Endpoint: POST /pipelines/<identifier>/execute

URL Parameters:

  • identifier (string, required): Pipeline identifier to execute

Request Body (Standard Mode):

{
  "record_ids": [101, 102, 103],
  "is_sandbox": true
}

Request Body (Versioning Mode):

{
  "concept_rec_id": "7891234",
  "file_manifest": {
    "files_to_keep": ["existing_file.xml"],
    "new_source_file_path": "/path/to/new_version.xml"
  },
  "is_sandbox": false
}

Request Parameters:

  • record_ids (array, optional): List of local record database IDs to process in standard mode
  • concept_rec_id (string, optional): Zenodo concept record ID for creating new versions
  • file_manifest (object, optional): File synchronization configuration for versioning mode
  • files_to_keep (array): Filenames to retain from previous version
  • new_source_file_path (string): Path to new source file for this version
  • is_sandbox (boolean, optional): Whether to use Zenodo sandbox. Defaults to true

Response (Success):

{
  "success": true,
  "message": "Successfully executed pipeline for 3 items."
}

Response (Partial Failure):

{
  "success": false,
  "message": "Pipeline finished with errors.",
  "error": "Item 102: Component 'xml_validator' execution failed.\nItem 103: Metadata re-preparation failed after pipeline run."
}

Status Codes:

  • 200 OK: Pipeline executed successfully for all items
  • 207 Multi-Status: Pipeline completed but some items failed
  • 400 Bad Request: No record IDs or concept record ID provided
  • 404 Not Found: Pipeline not found
  • 500 Internal Server Error: Unexpected error during execution

Pipeline Execution Flow

The pipeline executor follows this comprehensive workflow:

Step 1: Draft Creation

Versioning Mode: Creates a new version draft via create_new_version_draft, generates new local record ID, and optionally syncs file manifest.

Standard Mode: Optionally creates Zenodo draft via create_api_draft_for_prepared_record if zenodoDraftStepEnabled is true. Backs up existing metadata before execution.

Step 2: Output Directory Setup

Creates unique output directory based on Zenodo concept and record IDs, or uses timestamped fallback directory. Maps source file to execution file map for input resolution.

Step 3: Component Execution

For each pipeline step:

  1. Resolves step inputs from execution file map
  2. Constructs component run payload with inputs, parameters, and output directory
  3. Calls run_component internally via Flask test request context
  4. Polls execution status endpoint synchronously with timeout
  5. Retrieves output file paths on completion
  6. Adds output files to project database via add_source_files_route
  7. Maps output file IDs to paths in execution file map

Step 4: Metadata Override Collection

Collects metadata overrides from pipeline output files using the execution file map. Processes outputMapping configurations to extract Zenodo field mappings from JSON output files.

Step 5: Metadata Update

With Draft: Updates Zenodo draft metadata via API call using _update_draft_metadata.

Without Draft: Re-prepares metadata with overrides via prepare_metadata_for_file_route.

Step 6: File Upload

If zenodoUploadStepEnabled is true, uploads all associated files to Zenodo deposition via upload_files_for_deposition_route.

Step 7: Publication

If zenodoPublishStepEnabled is true, publishes the Zenodo record via publish_record.


Execution Status

Get Execution Status

Retrieves the current status of a specific pipeline execution.

Endpoint: GET /executions/<execution_uuid>/status

URL Parameters:

  • execution_uuid (string, required): UUID of the pipeline execution

Response:

{
  "execution_uuid": "550e8400-e29b-41d4-a716-446655440000",
  "pipeline_identifier": "tei_processing_pipeline",
  "status": "running",
  "started_at": "2025-10-21T13:30:00Z",
  "completed_at": null,
  "current_step": 2,
  "total_steps": 5
}

Response Fields:

  • execution_uuid (string): Unique execution identifier
  • pipeline_identifier (string): Identifier of the executed pipeline
  • status (string): Current execution status (e.g., running, completed, failed)
  • started_at (string): ISO 8601 timestamp of execution start
  • completed_at (string, nullable): ISO 8601 timestamp of completion, or null if still running
  • current_step (integer): Current step number being executed
  • total_steps (integer): Total number of steps in the pipeline

Status Codes:

  • 200 OK: Status retrieved successfully
  • 404 Not Found: Execution with specified UUID does not exist
  • 500 Internal Server Error: Failed to retrieve status

Metadata Management

Update Metadata Mapping

Updates the Zenodo metadata mapping configuration for a pipeline.

Endpoint: PUT /pipelines/<identifier>/metadata_mapping

URL Parameters:

  • identifier (string, required): Pipeline identifier

Request Body:

{
  "description_template": "This is a ${zenodo_metadata.metadata.title} with validation results from ${validation_report.summary}",
  "zenodoMappings": [
    {
      "zenodoField": "keywords",
      "jsonKey": "metadata.keywords"
    },
    {
      "zenodoField": "notes",
      "jsonKey": "validation.notes"
    }
  ]
}

Request Parameters:

  • Accepts any valid dictionary structure for metadata mapping configuration

Response:

{
  "success": true,
  "message": "Metadata mapping updated successfully."
}

Status Codes:

  • 200 OK: Metadata mapping updated successfully
  • 400 Bad Request: Invalid mapping data format (not a dictionary)
  • 404 Not Found: Pipeline not found or update failed
  • 500 Internal Server Error: Update operation failed

Helper Functions

Description Construction

Function: _construct_description(template: str, execution_file_map: dict, record_id: int, current_metadata: dict = None) -> str

Constructs Zenodo record descriptions by replacing placeholders with values from output files and Zenodo metadata.

Placeholder Syntax:

  • ${file_id.json_key}: Extracts value from JSON output file
  • ${file_id}: Reads entire file content
  • ${zenodo_metadata.key_path}: Extracts value from Zenodo record metadata

Example Template:

This record contains ${zenodo_metadata.metadata.title}. 
Validation results: ${validation_report.summary}.
Full report: ${validation_report}

Processing:

Uses regex to find all placeholders matching ${...} pattern. For file-based placeholders, reads files from execution file map. For JSON files, extracts nested values using dot notation. For Zenodo metadata, lazy-loads from database if not provided.


Draft Metadata Update

Function: _update_draft_metadata(record_id: int, overrides: dict) -> bool

Updates Zenodo draft metadata via API and synchronizes local database.

Process:

  1. Retrieves current record metadata and sandbox status from database
  2. Extracts Zenodo API self-link from metadata
  3. Merges overrides into metadata object
  4. Sends PUT request to Zenodo draft URL with updated metadata
  5. Updates local database with confirmed response from Zenodo
  6. Synchronizes title and version fields

Returns: True on success, False on failure


File Cleanup

Function: _cleanup_old_derived_files(record_id: int)

Deletes all derived files associated with a record before pipeline execution to ensure a clean slate.

Process:

  1. Retrieves source file ID for the record
  2. Finds all derived files in record_files_map (excluding source file)
  3. Deletes entries from record_files_map table
  4. Deletes entries from source_files table
  5. Commits transaction

Use Case: Prevents accumulation of outdated output files from previous pipeline runs.


Metadata Backup

Function: _backup_record_metadata(record_id: int)

Creates a backup of record metadata before pipeline execution.

Process:

  1. Retrieves current metadata JSON from zenodo_records table
  2. Deletes any existing backups for this record
  3. Inserts new backup into metadata_backups table with timestamp
  4. Stores empty JSON object as placeholder for mapping config

Non-Blocking: Errors during backup are logged but do not prevent pipeline execution.


Override Collection

Function: _collect_overrides_from_pipeline_outputs(pipeline: dict, execution_file_map: dict) -> dict

Reads pipeline output files to extract metadata values for Zenodo fields.

Process:

  1. Iterates through all pipeline steps and their outputs
  2. Filters outputs where mapToZenodo is true
  3. Retrieves actual file paths from execution file map
  4. Reads and parses JSON output files
  5. Extracts values using configured JSON key paths
  6. Maps values to Zenodo metadata fields

Returns: Dictionary mapping Zenodo field names to override values


Nested Value Extraction

Function: _get_nested_value(data: dict, key_path: str)

Safely retrieves values from nested dictionaries using dot notation.

Example:

data = {"metadata": {"creators": [{"name": "Smith"}]}}
value = _get_nested_value(data, "metadata.creators.0.name")
# Returns: "Smith"

Returns: Value if found, None if key path does not exist


Input Resolution

Function: resolve_step_inputs(step_input_mapping: dict, available_files: dict) -> dict

Resolves actual file paths for a pipeline step's inputs based on its mapping configuration.

Parameters:

  • step_input_mapping: Input mapping from step definition
  • available_files: Dictionary tracking available files (e.g., {'source_file': '/path/to/file.xml', 'output_1': '/path/to/result.json'})

Supported Source Types:

  • pipelineFile: Resolves file from execution file map using fileId

Returns: Dictionary mapping component input names to resolved file paths

Exception: Raises Exception if a required pipeline file cannot be resolved


File Manifest Synchronization

Function: _sync_file_manifest(new_draft_data: dict, file_manifest: dict, is_sandbox: bool, new_local_record_id: int)

Synchronizes files in a new Zenodo version draft based on user-provided manifest.

Process:

  1. Validates new source file path exists
  2. Retrieves bucket URL from Zenodo draft metadata
  3. Deletes unchecked files from draft via DELETE requests
  4. Uploads new source file to bucket via PUT request
  5. Inserts new source file into source_files table
  6. Updates zenodo_records to point to new source file
  7. Creates record-file mapping with uploaded status

Timeout: File upload has 300-second timeout


Database Integration

Pipeline Database Manager

The API uses PipelineDatabaseManager for persistent storage of pipeline definitions:

Database Path: databases/pipeline_system.db

Auto-Creation: Database directory is created automatically if it does not exist

Methods Used:

  • list_pipelines(status): Lists pipelines with optional filtering
  • get_pipeline(identifier): Retrieves full pipeline data
  • create_pipeline(data): Creates new pipeline
  • update_pipeline_complete(identifier, data): Updates entire pipeline
  • delete_pipeline(identifier): Deletes pipeline
  • import_from_yaml(yaml_content): Imports pipeline from YAML
  • export_to_yaml(identifier): Exports pipeline to YAML
  • get_execution_status(execution_uuid): Retrieves execution status
  • update_pipeline_metadata_mapping(identifier, mapping_data): Updates metadata mapping

Project Database Access

Pipeline execution uses direct SQLite connections to the project database:

Connection Pattern: Context manager with sqlite3.connect(project_manager.db_path)

Row Factory: Set to sqlite3.Row for dictionary-like access

Tables Accessed:

  • source_files: Stores file metadata and paths
  • zenodo_records: Stores Zenodo record metadata
  • record_files_map: Maps files to records
  • metadata_backups: Stores metadata backups
  • project_info: Stores project configuration

Error Handling

Exception Logging

All endpoints use comprehensive error logging with stack traces:

logger.error(f"Error in POST /pipelines/{identifier}/execute: {e}", exc_info=True)

Error Response Format

{
  "error": "Detailed error message"
}

Common Error Scenarios

Pipeline Not Found:

GET /pipelines/nonexistent_pipeline

Response: 404 Not Found with message "Pipeline not found"

No Data Provided:

POST /pipelines
Content-Type: application/json

Response: 400 Bad Request with message "No data provided"

Identifier Mismatch:

PUT /pipelines/pipeline_a
{"identifier": "pipeline_b", ...}

Response: 400 Bad Request with message "Identifier in URL and payload do not match"

Component Execution Timeout:

If a component exceeds its configured timeout, the pipeline execution fails with message: "Component '{component_name}' timed out after {timeout} seconds."


Internal API Calls

Test Request Context

The pipeline executor uses Flask's test_request_context for internal API calls:

with current_app.test_request_context(json=payload):
    response_tuple = run_component(component_name)
    result = response_tuple[0].get_json()

Called Endpoints:

  • run_component(component_name): Starts component execution
  • add_source_files_route(): Adds derived files to project
  • create_api_draft_for_prepared_record(): Creates Zenodo drafts
  • prepare_metadata_for_file_route(): Prepares or re-prepares metadata
  • upload_files_for_deposition_route(): Uploads files to Zenodo
  • publish_record(): Publishes Zenodo records

External HTTP Requests

For component status polling, the executor makes HTTP requests to localhost:

server_port = current_app.config.get("SERVER_PORT", 5001)
status_url = f"http://localhost:{server_port}/api/components/executions/{execution_id}/status"

Polling Interval: 2 seconds between status checks

Timeout: Configurable per step via timeout_seconds (default 300)


Usage Examples

Example 1: Create and Execute Simple Pipeline

POST /pipelines
Content-Type: application/json

{
  "identifier": "simple_validation",
  "name": "Simple Validation Pipeline",
  "zenodoDraftStepEnabled": false,
  "zenodoUploadStepEnabled": false,
  "zenodoPublishStepEnabled": false,
  "steps": [
    {
      "step_number": 1,
      "step_name": "validate",
      "component_name": "xml_validator",
      "parameters": {"schema": "tei_all"},
      "inputMapping": {
        "input_file": {
          "sourceType": "pipelineFile",
          "fileId": "source_file"
        }
      },
      "outputs": [
        {
          "id": "validation_result",
          "pattern": "{original_stem}_validation.json"
        }
      ]
    }
  ]
}

Execute:

POST /pipelines/simple_validation/execute
Content-Type: application/json

{
  "record_ids": [1, 2, 3],
  "is_sandbox": true
}

Example 2: Multi-Step Pipeline with Metadata Override

POST /pipelines
Content-Type: application/json

{
  "identifier": "full_processing",
  "name": "Full Processing Pipeline",
  "zenodoDraftStepEnabled": true,
  "zenodoUploadStepEnabled": true,
  "zenodoPublishStepEnabled": false,
  "steps": [
    {
      "step_number": 1,
      "step_name": "extract_metadata",
      "component_name": "metadata_extractor",
      "parameters": {},
      "inputMapping": {
        "input": {"sourceType": "pipelineFile", "fileId": "source_file"}
      },
      "outputs": [
        {
          "id": "extracted_metadata",
          "pattern": "{original_stem}_metadata.json",
          "outputMapping": {
            "mapToZenodo": true,
            "zenodoMappings": [
              {
                "zenodoField": "description",
                "jsonKey": "description"
              },
              {
                "zenodoField": "keywords",
                "jsonKey": "keywords"
              }
            ]
          }
        }
      ]
    },
    {
      "step_number": 2,
      "step_name": "generate_preview",
      "component_name": "preview_generator",
      "parameters": {"format": "html"},
      "inputMapping": {
        "input": {"sourceType": "pipelineFile", "fileId": "source_file"},
        "metadata": {"sourceType": "pipelineFile", "fileId": "extracted_metadata"}
      },
      "outputs": [
        {
          "id": "preview",
          "pattern": "{original_stem}_preview.html"
        }
      ]
    }
  ]
}

Example 3: Create New Version with File Manifest

POST /pipelines/full_processing/execute
Content-Type: application/json

{
  "concept_rec_id": "7891234",
  "file_manifest": {
    "files_to_keep": ["documentation.pdf", "supplementary_data.csv"],
    "new_source_file_path": "/data/manuscript_v2.xml"
  },
  "is_sandbox": false
}

Example 4: Import Pipeline from YAML

curl -X POST http://localhost:5001/api/pipelines/import \
  -F "file=@my_pipeline.yaml"

Response:

{
  "success": true,
  "pipeline_id": "imported_pipeline"
}

Configuration

Global Settings

Pipeline Database Path: Configured at module level as PIPELINE_DB_PATH

Server Port: Retrieved from Flask config as SERVER_PORT (default 5001)

Timeouts

Component Execution: Configurable per step via timeout_seconds parameter (default 300 seconds)

API Requests: Zenodo API calls use 60-second timeout

File Uploads: Zenodo file uploads use 300-second timeout

Status Polling: 2-second interval between polls