Find the complete code on Github
You can find the code used on this integration directly on OVHcloud's Github.

Apache Airflow¶
Note
Our provider supports OVHcloud AI products, but we are focusing on AI Endpoints in this documentation. You can explore the full OVHcloud Apache Airflow documentation.
Apache Airflow is a powerful open-source platform to programmatically author, schedule, and monitor workflows. You can seamlessly orchestrate AI workloads on OVHcloud infrastructure directly from your Airflow DAGs.
Installation¶
Prerequisites
- Python >= 3.8
- Apache Airflow >= 2.3.0
- AI Endpoints API key
Install the provider:
Configuration¶
Setting up Airflow Connection¶
Create an Airflow connection to store your OVHcloud API credentials:
Using Airflow UI¶
- Go to Admin > Connections
- Click the + button to add a new connection
- Fill in the details:
| Attribute | Value |
|---|---|
| Connection Id | ovh_ai_endpoints_default |
| Connection Type | generic |
| Password | Your OVHcloud AI Endpoints API key |
Using Airflow CLI¶
airflow connections add ovh_ai_endpoints_default \
--conn-type generic \
--conn-password your-api-token-here
Usage¶
Chat Completions¶
Generate text using Large Language Models:
from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_endpoints import (
OVHCloudAIEndpointsChatCompletionsOperator
)
from datetime import datetime
with DAG(
dag_id='llm_text_generation',
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
) as dag:
generate_text = OVHCloudAIEndpointsChatCompletionsOperator(
task_id='generate_response',
model='gpt-oss-120b',
messages=[
{"role": "system", "content": "You are a helpful AI assistant."},
{"role": "user", "content": "Explain machine learning in simple terms."}
],
temperature=0.7,
max_tokens=200,
)
Advanced Chat Completions¶
Use Jinja templating for dynamic content:
from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_endpoints import (
OVHCloudAIEndpointsChatCompletionsOperator
)
from datetime import datetime
with DAG(
dag_id='dynamic_llm_generation',
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
) as dag:
# Pass data through Airflow context
analyze_sentiment = OVHCloudAIEndpointsChatCompletionsOperator(
task_id='analyze_sentiment',
model='gpt-oss-120b',
messages=[
{
"role": "system",
"content": "You are a sentiment analysis expert. Respond only with: positive, negative, or neutral."
},
{
"role": "user",
"content": "Analyze the sentiment: {{ dag_run.conf['text'] }}"
}
],
temperature=0.3,
max_tokens=10,
)
Trigger this DAG with configuration:
airflow dags trigger dynamic_llm_generation \
--conf '{"text": "I love this product! It works great!"}'
Embeddings¶
Create vector embeddings for semantic search:
from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_endpoints import (
OVHCloudAIEndpointsEmbeddingOperator
)
from datetime import datetime
with DAG(
dag_id='create_embeddings',
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
) as dag:
embed_text = OVHCloudAIEndpointsEmbeddingOperator(
task_id='create_embedding',
model='BGE-M3',
input="Apache Airflow is a platform to programmatically author, schedule and monitor workflows."
)
Batch Embeddings¶
Process multiple texts:
from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_endpoints import (
OVHCloudAIEndpointsEmbeddingOperator
)
from datetime import datetime
with DAG(
dag_id='batch_embeddings',
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
) as dag:
embed_documents = OVHCloudAIEndpointsEmbeddingOperator(
task_id='embed_documents',
model='BGE-M3',
input=[
"First document to embed",
"Second document to embed",
"Third document to embed"
]
)
Advanced Features¶
Task Output and XCom¶
Access operator outputs in downstream tasks:
from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_endpoints import (
OVHCloudAIEndpointsChatCompletionsOperator
)
from airflow.operators.python import PythonOperator
from datetime import datetime
def process_llm_response(**context):
# Pull the response from XCom
ti = context['ti']
llm_response = ti.xcom_pull(task_ids='generate_text')
print(f"LLM said: {llm_response}")
# Process the response
return {"processed": True}
with DAG(
dag_id='xcom_example',
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
) as dag:
generate = OVHCloudAIEndpointsChatCompletionsOperator(
task_id='generate_text',
model='gpt-oss-120b',
messages=[
{"role": "user", "content": "Say hello!"}
],
)
process = PythonOperator(
task_id='process_response',
python_callable=process_llm_response,
)
generate >> process
Error Handling and Retries¶
Configure retries for production workflows:
from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_endpoints import (
OVHCloudAIEndpointsChatCompletionsOperator
)
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='production_llm_pipeline',
default_args=default_args,
start_date=datetime(2024, 1, 1),
schedule='@hourly',
catchup=False,
) as dag:
generate_text = OVHCloudAIEndpointsChatCompletionsOperator(
task_id='generate_text',
model='gpt-oss-120b',
messages=[
{"role": "user", "content": "Generate content"}
],
execution_timeout=timedelta(minutes=10),
)
Parallel Processing¶
Run multiple AI tasks in parallel:
from airflow import DAG
from apache_airflow_provider_ovhcloud_ai.operators.ai_endpoints import (
OVHCloudAIEndpointsChatCompletionsOperator,
OVHCloudAIEndpointsEmbeddingOperator
)
from datetime import datetime
with DAG(
dag_id='parallel_ai_tasks',
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
) as dag:
tasks = []
# Generate multiple responses in parallel
for i in range(5):
task = OVHCloudAIEndpointsChatCompletionsOperator(
task_id=f'generate_response_{i}',
model='gpt-oss-120b',
messages=[
{"role": "user", "content": f"Generate idea number {i}"}
],
)
tasks.append(task)
# All tasks run in parallel (no dependencies)
Going Further¶
Additional Resources¶
- Apache Airflow Official Documentation
- OVHcloud AI Endpoints Documentation
- Provider GitHub Repository
- Provider Documentation
Community and Support¶
- Need support or have questions? Join our Discord in the #ai-endpoint channel
- Found a bug or want to contribute? Visit the GitHub repository
- Have discussions and ask questions in GitHub Discussions
- Report issues on the Issue Tracker