Pod and python for text embedding with colnomic-embed-multimodal-7b.py
This commit is contained in:
166
.local/share/pytorch_pod/python-apps/colnomic-embed-multimodal-7b.py
Executable file
166
.local/share/pytorch_pod/python-apps/colnomic-embed-multimodal-7b.py
Executable file
@@ -0,0 +1,166 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import os
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
import torch
|
||||||
|
from fastapi import FastAPI, HTTPException
|
||||||
|
from pydantic import BaseModel
|
||||||
|
from transformers.utils.import_utils import is_flash_attn_2_available
|
||||||
|
from colpali_engine.models import ColQwen2_5, ColQwen2_5_Processor
|
||||||
|
|
||||||
|
HF_MODEL_ID = os.environ.get("HF_MODEL_ID", "nomic-ai/colnomic-embed-multimodal-7b")
|
||||||
|
HF_MODEL_URL = os.environ.get("HF_MODEL_URL")
|
||||||
|
API_PORT = int(os.environ.get("PYTORCH_CONTAINER_PORT", os.environ.get("PORT", "8000")))
|
||||||
|
|
||||||
|
app = FastAPI(title="Colnomic Embed Multimodal 7B API")
|
||||||
|
|
||||||
|
_model = None
|
||||||
|
_processor = None
|
||||||
|
_device = None
|
||||||
|
|
||||||
|
|
||||||
|
def _ensure_model_loaded():
|
||||||
|
"""
|
||||||
|
Lazy-load the ColNomic model and processor on first request.
|
||||||
|
|
||||||
|
Hard requirements for this deployment:
|
||||||
|
- CUDA must be available.
|
||||||
|
- FlashAttention-2 must be available (flash-attn successfully installed).
|
||||||
|
|
||||||
|
If either is missing, an exception is raised and /health returns 500.
|
||||||
|
"""
|
||||||
|
global _model, _processor, _device
|
||||||
|
|
||||||
|
if _model is not None and _processor is not None:
|
||||||
|
return _model, _processor, _device
|
||||||
|
|
||||||
|
if not torch.cuda.is_available():
|
||||||
|
raise RuntimeError("CUDA is not available; a CUDA-capable GPU is required.")
|
||||||
|
|
||||||
|
if not is_flash_attn_2_available():
|
||||||
|
raise RuntimeError("flash_attn_2 is not available; please install compatible libraries.")
|
||||||
|
|
||||||
|
# Choose dtype: BF16 if supported, otherwise FP16
|
||||||
|
dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16
|
||||||
|
|
||||||
|
# Use a single GPU (cuda:0) for now.
|
||||||
|
device_map = "cuda:0"
|
||||||
|
|
||||||
|
# Force FlashAttention-2 (we already checked availability above).
|
||||||
|
attn_impl = "flash_attention_2"
|
||||||
|
|
||||||
|
model = ColQwen2_5.from_pretrained(
|
||||||
|
HF_MODEL_ID,
|
||||||
|
torch_dtype=dtype,
|
||||||
|
device_map=device_map,
|
||||||
|
attn_implementation=attn_impl,
|
||||||
|
).eval()
|
||||||
|
|
||||||
|
processor = ColQwen2_5_Processor.from_pretrained(HF_MODEL_ID)
|
||||||
|
|
||||||
|
_model = model
|
||||||
|
_processor = processor
|
||||||
|
_device = device_map
|
||||||
|
|
||||||
|
return _model, _processor, _device
|
||||||
|
|
||||||
|
|
||||||
|
class EmbedRequest(BaseModel):
|
||||||
|
texts: List[str]
|
||||||
|
|
||||||
|
|
||||||
|
class EmbedResponse(BaseModel):
|
||||||
|
model_id: str
|
||||||
|
# results[batch][tokens][dim]
|
||||||
|
results: List[List[List[float]]]
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health")
|
||||||
|
def health():
|
||||||
|
"""
|
||||||
|
Health check:
|
||||||
|
- Reports CUDA and FlashAttention-2 availability.
|
||||||
|
- Tries to load the model once (lazy).
|
||||||
|
- Returns 200 only if CUDA, FlashAttention-2 and model loading are OK.
|
||||||
|
"""
|
||||||
|
cuda_ok = bool(torch.cuda.is_available())
|
||||||
|
flash_ok = bool(is_flash_attn_2_available())
|
||||||
|
|
||||||
|
info = {
|
||||||
|
"status": "ok",
|
||||||
|
"model_id": HF_MODEL_ID,
|
||||||
|
"model_url": HF_MODEL_URL,
|
||||||
|
"cuda_available": cuda_ok,
|
||||||
|
"flash_attn_2_available": flash_ok,
|
||||||
|
}
|
||||||
|
|
||||||
|
# CUDA or FlashAttention missing -> hard failure
|
||||||
|
if not cuda_ok:
|
||||||
|
info["status"] = "error"
|
||||||
|
info["error"] = "CUDA is not available inside the container."
|
||||||
|
raise HTTPException(status_code=500, detail=info)
|
||||||
|
|
||||||
|
if not flash_ok:
|
||||||
|
info["status"] = "error"
|
||||||
|
info["error"] = "flash_attn_2 is not available; this deployment requires FlashAttention-2."
|
||||||
|
raise HTTPException(status_code=500, detail=info)
|
||||||
|
|
||||||
|
try:
|
||||||
|
_ensure_model_loaded()
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
info["status"] = "error"
|
||||||
|
info["error"] = str(exc)
|
||||||
|
raise HTTPException(status_code=500, detail=info) from exc
|
||||||
|
|
||||||
|
return info
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/embed", response_model=EmbedResponse)
|
||||||
|
def embed(request: EmbedRequest):
|
||||||
|
"""
|
||||||
|
Compute multi-vector embeddings for a list of texts.
|
||||||
|
|
||||||
|
Result shape: results[batch][tokens][dim] (multi-vector per text).
|
||||||
|
"""
|
||||||
|
if not request.texts:
|
||||||
|
raise HTTPException(status_code=400, detail="texts must not be empty")
|
||||||
|
|
||||||
|
model, processor, device = _ensure_model_loaded() # noqa: F841 - device kept for future use
|
||||||
|
|
||||||
|
# For queries, use process_queries (as in ColQwen2.5 docs)
|
||||||
|
with torch.inference_mode():
|
||||||
|
batch = processor.process_queries(request.texts).to(model.device)
|
||||||
|
outputs = model(**batch)
|
||||||
|
|
||||||
|
# ColQwen2.5 returns either:
|
||||||
|
# - a tensor shaped (batch, tokens, dim), or
|
||||||
|
# - an object with .last_hidden_state
|
||||||
|
if isinstance(outputs, torch.Tensor):
|
||||||
|
embeddings = outputs
|
||||||
|
elif hasattr(outputs, "last_hidden_state"):
|
||||||
|
embeddings = outputs.last_hidden_state
|
||||||
|
else:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500,
|
||||||
|
detail=f"Unexpected model output type from ColQwen/ColPali: {type(outputs)}",
|
||||||
|
)
|
||||||
|
|
||||||
|
if embeddings.dim() == 2: # (tokens, dim) -> single text
|
||||||
|
embeddings = embeddings.unsqueeze(0)
|
||||||
|
elif embeddings.dim() != 3:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500,
|
||||||
|
detail=f"Unexpected embedding shape: {tuple(embeddings.shape)}",
|
||||||
|
)
|
||||||
|
|
||||||
|
embeddings = embeddings.detach().cpu().float()
|
||||||
|
results = embeddings.tolist()
|
||||||
|
|
||||||
|
return EmbedResponse(model_id=HF_MODEL_ID, results=results)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import uvicorn
|
||||||
|
|
||||||
|
uvicorn.run(app, host="0.0.0.0", port=API_PORT)
|
||||||
214
bin/create_pod_pytorch.sh
Executable file
214
bin/create_pod_pytorch.sh
Executable file
@@ -0,0 +1,214 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
# To be run by user llm to create the pod and container with
|
||||||
|
# PyTorch + HTTP API, to pull ColNomic embedding model if missing and
|
||||||
|
# to create the systemd service
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
# Environment variables
|
||||||
|
POD_NAME='pytorch_pod'
|
||||||
|
CTR_NAME='pytorch_ctr'
|
||||||
|
# NVIDIA NGC PyTorch container with CUDA 13.0 (25.08 release)
|
||||||
|
BASE_IMAGE='nvcr.io/nvidia/pytorch:25.08-py3'
|
||||||
|
CUSTOM_IMAGE='localhost/pytorch-api:25.08-cuda13.0'
|
||||||
|
HF_MODEL_ID='nomic-ai/colnomic-embed-multimodal-7b'
|
||||||
|
HF_MODEL_URL='https://huggingface.co/nomic-ai/colnomic-embed-multimodal-7b'
|
||||||
|
HOST_LOCAL_IP='127.0.0.1'
|
||||||
|
PYTORCH_HOST_PORT='8086'
|
||||||
|
PYTORCH_CONTAINER_PORT='8000'
|
||||||
|
BIND_DIR="$HOME/.local/share/$POD_NAME"
|
||||||
|
AI_MODELS_DIR="$BIND_DIR/ai-models"
|
||||||
|
PYTHON_APPS_DIR="$BIND_DIR/python-apps"
|
||||||
|
USER_SYSTEMD_DIR="$HOME/.config/systemd/user"
|
||||||
|
CONTAINERFILE="$BIND_DIR/containerfile"
|
||||||
|
PY_APP="$PYTHON_APPS_DIR/colnomic-embed-multimodal-7b.py"
|
||||||
|
|
||||||
|
# Prepare directories
|
||||||
|
mkdir -p "$AI_MODELS_DIR" "$PYTHON_APPS_DIR" "$USER_SYSTEMD_DIR"
|
||||||
|
|
||||||
|
# Generate containerfile
|
||||||
|
cat >"$CONTAINERFILE" <<'EOF'
|
||||||
|
# Containerfile for PyTorch + FastAPI + ColPali (ColNomic embed model support)
|
||||||
|
|
||||||
|
ARG BASE_IMAGE
|
||||||
|
FROM ${BASE_IMAGE}
|
||||||
|
|
||||||
|
# Hugging Face caches and Python apps directory (bind-mounted at runtime)
|
||||||
|
ENV HF_HOME=/models/hf \
|
||||||
|
TRANSFORMERS_CACHE=/models/hf/transformers \
|
||||||
|
PYTHON_APPS_DIR=/python-apps
|
||||||
|
|
||||||
|
# Ensure directories exist
|
||||||
|
RUN mkdir -p /models/hf/transformers /python-apps
|
||||||
|
|
||||||
|
# Install git (for colpali) and clean apt lists
|
||||||
|
RUN apt-get update && \
|
||||||
|
apt-get install -y --no-install-recommends git && \
|
||||||
|
rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
# Upgrade pip and install runtime dependencies:
|
||||||
|
# - fastapi, uvicorn for the HTTP API
|
||||||
|
# - transformers, accelerate, peft for HF + ColPali ecosystem
|
||||||
|
# - flash-attn to provide FlashAttention-2 kernels
|
||||||
|
# - colpali pinned to specific commit, installed WITHOUT deps to avoid
|
||||||
|
# overriding the PyTorch provided by the base image.
|
||||||
|
RUN python -m pip install --upgrade pip && \
|
||||||
|
python -m pip install --no-cache-dir \
|
||||||
|
fastapi \
|
||||||
|
"uvicorn[standard]" \
|
||||||
|
transformers \
|
||||||
|
accelerate \
|
||||||
|
peft && \
|
||||||
|
python -m pip install --no-cache-dir flash-attn --no-build-isolation && \
|
||||||
|
python -m pip install --no-cache-dir --no-deps \
|
||||||
|
"git+https://github.com/illuin-tech/colpali.git@97e389a" && \
|
||||||
|
python -m pip cache purge
|
||||||
|
|
||||||
|
# Make /python-apps importable by default
|
||||||
|
ENV PYTHONPATH=/python-apps:${PYTHONPATH}
|
||||||
|
|
||||||
|
WORKDIR /workspace
|
||||||
|
|
||||||
|
# Default command can be overridden by podman run.
|
||||||
|
CMD ["bash"]
|
||||||
|
EOF
|
||||||
|
|
||||||
|
# Build custom container image
|
||||||
|
podman build \
|
||||||
|
--build-arg BASE_IMAGE="$BASE_IMAGE" \
|
||||||
|
-t "$CUSTOM_IMAGE" \
|
||||||
|
-f "$CONTAINERFILE" \
|
||||||
|
"$(dirname "$CONTAINERFILE")"
|
||||||
|
|
||||||
|
# Create pod if not yet existing
|
||||||
|
if ! podman pod exists "$POD_NAME"; then
|
||||||
|
podman pod create -n "$POD_NAME" \
|
||||||
|
-p "$HOST_LOCAL_IP:$PYTORCH_HOST_PORT:$PYTORCH_CONTAINER_PORT"
|
||||||
|
echo "Pod '$POD_NAME' created (rc=$?)"
|
||||||
|
else
|
||||||
|
echo "Pod '$POD_NAME' already exists."
|
||||||
|
fi
|
||||||
|
|
||||||
|
# PyTorch + HTTP API container
|
||||||
|
# Remove old container
|
||||||
|
podman rm -f "$CTR_NAME"
|
||||||
|
# New container
|
||||||
|
podman run -d --name "$CTR_NAME" --pod "$POD_NAME" \
|
||||||
|
--device nvidia.com/gpu=all \
|
||||||
|
-e HF_MODEL_ID="$HF_MODEL_ID" \
|
||||||
|
-e HF_MODEL_URL="$HF_MODEL_URL" \
|
||||||
|
-e PYTORCH_CONTAINER_PORT="$PYTORCH_CONTAINER_PORT" \
|
||||||
|
-v "$AI_MODELS_DIR":/models \
|
||||||
|
-v "$PYTHON_APPS_DIR":/python-apps \
|
||||||
|
"$CUSTOM_IMAGE" \
|
||||||
|
python "$PY_APP"
|
||||||
|
|
||||||
|
# Wait for API readiness (/health)
|
||||||
|
HEALTH_URL="http://$HOST_LOCAL_IP:$PYTORCH_HOST_PORT/health"
|
||||||
|
echo -n "Waiting for PyTorch API at $HEALTH_URL ..."
|
||||||
|
for attempt in $(seq 1 30); do
|
||||||
|
if curl -fsS "$HEALTH_URL" >/dev/null 2>&1; then
|
||||||
|
echo "ready."
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
sleep 2
|
||||||
|
if [ "$attempt" -eq 30 ]; then
|
||||||
|
echo "timeout error." >&2
|
||||||
|
echo "Container logs:" >&2
|
||||||
|
podman logs "$CTR_NAME"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
# Smoke tests
|
||||||
|
|
||||||
|
# GPU availability
|
||||||
|
GPU_JSON="$(
|
||||||
|
podman exec "$CTR_NAME" python -c '
|
||||||
|
import json, sys
|
||||||
|
try:
|
||||||
|
import torch
|
||||||
|
except Exception as e:
|
||||||
|
# Exit code 1 -> internal error (import torch failed, etc.)
|
||||||
|
print(json.dumps({"error": f"import torch failed: {e}"}))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
data = {
|
||||||
|
"cuda_available": bool(torch.cuda.is_available()),
|
||||||
|
"device_count": int(torch.cuda.device_count()),
|
||||||
|
}
|
||||||
|
print(json.dumps(data))
|
||||||
|
# Exit code 0 -> cuda_available is True
|
||||||
|
# Exit code 2 -> cuda_available is False
|
||||||
|
sys.exit(0 if data["cuda_available"] else 2)
|
||||||
|
'
|
||||||
|
)"
|
||||||
|
GPU_RC=$?
|
||||||
|
# echo "podman exec exit code: $GPU_RC"
|
||||||
|
# echo "GPU_JSON: $GPU_JSON"
|
||||||
|
if [ "$GPU_RC" -eq 0 ]; then
|
||||||
|
echo "GPU is available in container $CTR_NAME (cuda_available == true)."
|
||||||
|
elif [ "$GPU_RC" -eq 2 ]; then
|
||||||
|
echo "ERROR: CUDA GPU is NOT available inside the container." >&2
|
||||||
|
echo "Details: $GPU_JSON" >&2
|
||||||
|
echo "This may be due to missing NVIDIA CDI configuration or SELinux labeling." >&2
|
||||||
|
exit 1
|
||||||
|
else
|
||||||
|
echo "ERROR: podman exec GPU test failed (exit code $GPU_RC)." >&2
|
||||||
|
echo "Details: $GPU_JSON" >&2
|
||||||
|
echo "Container logs for debugging:" >&2
|
||||||
|
podman logs "$CTR_NAME" || true
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Python API /health
|
||||||
|
HEALTH_JSON="$(curl -fsS "$HEALTH_URL")"
|
||||||
|
echo "$HEALTH_JSON"
|
||||||
|
if ! printf '%s' "$HEALTH_JSON" | grep -q '"status":"ok"'; then
|
||||||
|
echo "ERROR: /health endpoint did not report status \"ok\"." >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Python API /embed
|
||||||
|
EMBED_URL="http://$HOST_LOCAL_IP:$PYTORCH_HOST_PORT/embed"
|
||||||
|
EMBED_JSON="$(curl -fsS -X POST "$EMBED_URL" \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d '{"texts":["hello world from colnomic"]}')"
|
||||||
|
echo "$EMBED_JSON"
|
||||||
|
if ! printf '%s' "$EMBED_JSON" | grep -q '"results"'; then
|
||||||
|
echo "ERROR: /embed endpoint did not return embeddings as expected." >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Generate systemd service files
|
||||||
|
cd "$USER_SYSTEMD_DIR"
|
||||||
|
podman generate systemd --name --new --files "$POD_NAME"
|
||||||
|
echo "Generated systemd service files (rc=$?)"
|
||||||
|
|
||||||
|
# Stop & remove live pod and containers
|
||||||
|
podman pod stop --ignore --time 15 "$POD_NAME"
|
||||||
|
podman pod rm -f --ignore "$POD_NAME"
|
||||||
|
if podman pod exists "$POD_NAME"; then
|
||||||
|
echo "ERROR: Pod $POD_NAME still exists." >&2
|
||||||
|
exit 1
|
||||||
|
else
|
||||||
|
echo "Stopped & removed live pod $POD_NAME and containers"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Enable systemd service
|
||||||
|
systemctl --user daemon-reload
|
||||||
|
systemctl --user enable --now "pod-${POD_NAME}.service"
|
||||||
|
systemctl --user is-enabled "pod-$POD_NAME.service"
|
||||||
|
systemctl --user is-active "pod-$POD_NAME.service"
|
||||||
|
echo "Enabled systemd service pod-${POD_NAME}.service (rc=$?)"
|
||||||
|
echo "To view status: systemctl --user status pod-${POD_NAME}.service"
|
||||||
|
echo "To view logs: journalctl --user -u pod-${POD_NAME}.service -f"
|
||||||
|
systemctl --user enable --now "container-${CTR_NAME}.service"
|
||||||
|
systemctl --user is-enabled "container-${CTR_NAME}.service"
|
||||||
|
systemctl --user is-active "container-${CTR_NAME}.service"
|
||||||
|
echo "Enabled systemd service container-${CTR_NAME}.service (rc=$?)"
|
||||||
|
echo "To view status: systemctl --user status container-${CTR_NAME}.service"
|
||||||
|
echo "To view logs: journalctl --user -u container-${CTR_NAME}.service -f"
|
||||||
|
|
||||||
|
echo "PyTorch API is reachable at http://$HOST_LOCAL_IP:$PYTORCH_HOST_PORT"
|
||||||
Reference in New Issue
Block a user