Skip to content

Find the complete code on Github

You can find the code used on this integration directly on OVHcloud's Github.

Apache Airflow

Apache Airflow

Note

Our provider supports OVHcloud AI products, but we are focusing on AI Endpoints in this documentation. You can explore the full OVHcloud Apache Airflow documentation.

Apache Airflow is a powerful open-source platform to programmatically author, schedule, and monitor workflows. You can seamlessly orchestrate AI workloads on OVHcloud infrastructure directly from your Airflow DAGs.

Installation

Prerequisites

Install the provider:

pip install apache-airflow-provider-ovhcloud-ai
uv add apache-airflow-provider-ovhcloud-ai

Configuration

Setting up Airflow Connection

Create an Airflow connection to store your OVHcloud API credentials:

Using Airflow UI

  1. Go to Admin > Connections
  2. Click the + button to add a new connection
  3. Fill in the details:
Attribute Value
Connection Id ovh_ai_endpoints_default
Connection Type generic
Password Your OVHcloud AI Endpoints API key

Using Airflow CLI

airflow connections add ovh_ai_endpoints_default \
    --conn-type generic \
    --conn-password your-api-token-here

Usage

Chat Completions

Generate text using Large Language Models:

from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_endpoints import (
    OVHCloudAIEndpointsChatCompletionsOperator
)
from datetime import datetime

with DAG(
    dag_id='llm_text_generation',
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:

    generate_text = OVHCloudAIEndpointsChatCompletionsOperator(
        task_id='generate_response',
        model='gpt-oss-120b',
        messages=[
            {"role": "system", "content": "You are a helpful AI assistant."},
            {"role": "user", "content": "Explain machine learning in simple terms."}
        ],
        temperature=0.7,
        max_tokens=200,
    )

Advanced Chat Completions

Use Jinja templating for dynamic content:

from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_endpoints import (
    OVHCloudAIEndpointsChatCompletionsOperator
)
from datetime import datetime

with DAG(
    dag_id='dynamic_llm_generation',
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:

    # Pass data through Airflow context
    analyze_sentiment = OVHCloudAIEndpointsChatCompletionsOperator(
        task_id='analyze_sentiment',
        model='gpt-oss-120b',
        messages=[
            {
                "role": "system", 
                "content": "You are a sentiment analysis expert. Respond only with: positive, negative, or neutral."
            },
            {
                "role": "user", 
                "content": "Analyze the sentiment: {{ dag_run.conf['text'] }}"
            }
        ],
        temperature=0.3,
        max_tokens=10,
    )

Trigger this DAG with configuration:

airflow dags trigger dynamic_llm_generation \
    --conf '{"text": "I love this product! It works great!"}'

Embeddings

Create vector embeddings for semantic search:

from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_endpoints import (
    OVHCloudAIEndpointsEmbeddingOperator
)
from datetime import datetime

with DAG(
    dag_id='create_embeddings',
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:

    embed_text = OVHCloudAIEndpointsEmbeddingOperator(
        task_id='create_embedding',
        model='BGE-M3',
        input="Apache Airflow is a platform to programmatically author, schedule and monitor workflows."
    )

Batch Embeddings

Process multiple texts:

from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_endpoints import (
    OVHCloudAIEndpointsEmbeddingOperator
)
from datetime import datetime

with DAG(
    dag_id='batch_embeddings',
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:

    embed_documents = OVHCloudAIEndpointsEmbeddingOperator(
        task_id='embed_documents',
        model='BGE-M3',
        input=[
            "First document to embed",
            "Second document to embed",
            "Third document to embed"
        ]
    )

Advanced Features

Task Output and XCom

Access operator outputs in downstream tasks:

from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_endpoints import (
    OVHCloudAIEndpointsChatCompletionsOperator
)
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_llm_response(**context):
    # Pull the response from XCom
    ti = context['ti']
    llm_response = ti.xcom_pull(task_ids='generate_text')
    print(f"LLM said: {llm_response}")

    # Process the response
    return {"processed": True}

with DAG(
    dag_id='xcom_example',
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:

    generate = OVHCloudAIEndpointsChatCompletionsOperator(
        task_id='generate_text',
        model='gpt-oss-120b',
        messages=[
            {"role": "user", "content": "Say hello!"}
        ],
    )

    process = PythonOperator(
        task_id='process_response',
        python_callable=process_llm_response,
    )

    generate >> process

Error Handling and Retries

Configure retries for production workflows:

from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_endpoints import (
    OVHCloudAIEndpointsChatCompletionsOperator
)
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='production_llm_pipeline',
    default_args=default_args,
    start_date=datetime(2024, 1, 1),
    schedule='@hourly',
    catchup=False,
) as dag:

    generate_text = OVHCloudAIEndpointsChatCompletionsOperator(
        task_id='generate_text',
        model='gpt-oss-120b',
        messages=[
            {"role": "user", "content": "Generate content"}
        ],
        execution_timeout=timedelta(minutes=10),
    )

Parallel Processing

Run multiple AI tasks in parallel:

from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_endpoints import (
    OVHCloudAIEndpointsChatCompletionsOperator,
    OVHCloudAIEndpointsEmbeddingOperator
)
from datetime import datetime

with DAG(
    dag_id='parallel_ai_tasks',
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:

    tasks = []

    # Generate multiple responses in parallel
    for i in range(5):
        task = OVHCloudAIEndpointsChatCompletionsOperator(
            task_id=f'generate_response_{i}',
            model='gpt-oss-120b',
            messages=[
                {"role": "user", "content": f"Generate idea number {i}"}
            ],
        )
        tasks.append(task)

    # All tasks run in parallel (no dependencies)

Going Further

Additional Resources

Community and Support