Advanced MCP Topics

This section covers advanced topics and techniques for working with the MCP Python SDK.

Server Lifecycle Management

Managing the lifecycle of your MCP server is crucial for resource management:

from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
import aiohttp
import aiosqlite

from mcp.server import Server
import mcp.types as types

# Define a lifespan context manager
@asynccontextmanager
async def server_lifespan(server: Server) -> AsyncIterator[dict]:
    print("Server starting up...")

    # Initialize resources
    db = await aiosqlite.connect("database.db")
    session = aiohttp.ClientSession()

    try:
        # Yield resources to request handlers
        yield {
            "db": db,
            "session": session,
        }
    finally:
        # Clean up resources
        print("Server shutting down...")
        await session.close()
        await db.close()

# Create server with the lifespan manager
server = Server("advanced-server", lifespan=server_lifespan)

# Access lifespan context in handlers
@server.call_tool()
async def db_query(name: str, arguments: dict) -> dict:
    # Get the database from the lifespan context
    ctx = server.request_context
    db = ctx.lifespan_context["db"]

    # Use the database connection
    cursor = await db.execute(arguments["query"])
    rows = await cursor.fetchall()
    return {"rows": rows}

Subscriptions and Notifications

MCP supports notifying clients of changes to resources and tools:

from mcp.server.fastmcp import FastMCP
import asyncio

mcp = FastMCP("Notification Demo")

# Track state
messages = []

@mcp.resource("messages://all")
def get_messages() -> list:
    """Get all messages."""
    return messages

@mcp.tool()
async def add_message(content: str) -> dict:
    """Add a new message and notify subscribers."""
    message = {"id": len(messages) + 1, "content": content}
    messages.append(message)

    # Notify that messages resource has changed
    await mcp.notify_resource_changed("messages://all")

    return {"status": "added", "message": message}

@mcp.resource("status://app")
def get_status() -> dict:
    """Get application status."""
    return {"status": "online", "messages": len(messages)}

# Background task to periodically notify of status changes
async def status_updater():
    while True:
        await asyncio.sleep(30)  # Update every 30 seconds
        await mcp.notify_resource_changed("status://app")

# Start the background task
@mcp.on_startup
async def startup():
    asyncio.create_task(status_updater())

WebSocket Support

In addition to the default SSE (Server-Sent Events) support, you can add WebSocket connectivity:

from starlette.applications import Starlette
from starlette.routing import Mount, WebSocketRoute
from starlette.websockets import WebSocket

from mcp.server.fastmcp import FastMCP

mcp = FastMCP("WebSocket Demo")

# Define your resources, tools, and prompts here...

# Starlette app with both SSE and WebSocket endpoints
app = Starlette(
    routes=[
        # Mount the standard SSE endpoint
        Mount('/sse', app=mcp.sse_app()),

        # Custom WebSocket endpoint
        WebSocketRoute('/ws', endpoint=mcp.websocket_endpoint),
    ]
)

# Run with uvicorn
import uvicorn

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Authentication and Security

Implementing authentication for your MCP server:

from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.authentication import (
    AuthenticationBackend, AuthCredentials, BaseUser, SimpleUser
)
from starlette.routing import Mount
import jwt

from mcp.server.fastmcp import FastMCP

# Create your MCP server
mcp = FastMCP("Secure Server")

# Define a custom authentication backend
class JWTAuthBackend(AuthenticationBackend):
    async def authenticate(self, request):
        if "Authorization" not in request.headers:
            return None

        auth = request.headers["Authorization"]
        if not auth.startswith("Bearer "):
            return None

        token = auth.replace("Bearer ", "")
        try:
            payload = jwt.decode(
                token,
                "your-secret-key",  # Use a proper secret key management in production
                algorithms=["HS256"]
            )
            return AuthCredentials(["authenticated"]), SimpleUser(payload["sub"])
        except jwt.PyJWTError:
            return None

# Create a Starlette app with authentication
app = Starlette(
    routes=[
        Mount('/mcp', app=mcp.sse_app()),
    ],
    middleware=[
        Middleware(AuthenticationMiddleware, backend=JWTAuthBackend())
    ]
)

# Add authentication check in your handlers
@mcp.resource("user://{id}")
def get_user(id: str) -> dict:
    """Get user information."""
    # Access the request context
    request = mcp.get_request()

    # Check authentication
    if not request.user.is_authenticated:
        return {"error": "Authentication required"}

    if request.user.username != id and not request.user.username == "admin":
        return {"error": "Unauthorized access"}

    # Return user data
    return {"id": id, "name": f"User {id}"}

Streaming Responses

Handling streaming responses with MCP:

import asyncio
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Streaming Demo")

@mcp.tool()
async def stream_data(items: int = 5, delay: float = 1.0) -> dict:
    """Demonstrate streaming data with a generator."""
    for i in range(items):
        # Yield incremental progress
        yield {
            "progress": (i + 1) / items,
            "current": i + 1,
            "total": items,
            "data": f"Item {i + 1}"
        }
        await asyncio.sleep(delay)

    # Final result
    return {
        "progress": 1.0,
        "message": f"Completed {items} items",
        "status": "done"
    }

Extending the Protocol

You can extend MCP with experimental capabilities:

from mcp.server import Server
from mcp.server.lowlevel import NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.server.stdio

# Create a server
server = Server("extended-server")

# Define custom experimental capability handlers
@server.register_method("experimental/customAction")
async def handle_custom_action(params):
    # Handle the custom action
    return {"status": "success", "result": params["value"] * 2}

async def run():
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="extended-server",
                server_version="0.1.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={
                        "customAction": {}  # Define your experimental capability
                    },
                ),
            ),
        )

Testing MCP Servers

Strategies for testing your MCP servers:

import pytest
import asyncio
from mcp.server.fastmcp import FastMCP
from mcp import ClientSession
from mcp.client.socket import socket_client
from mcp.client.stdio import stdio_client
from mcp import StdioServerParameters

# Server fixture for testing
@pytest.fixture
async def test_server():
    mcp = FastMCP("Test Server")

    @mcp.resource("test://data")
    def test_resource() -> str:
        return "test data"

    @mcp.tool()
    def test_tool(value: str) -> dict:
        return {"result": f"processed: {value}"}

    # Start server in background
    server_task = asyncio.create_task(mcp.run(host="localhost", port=8765))
    await asyncio.sleep(0.5)  # Wait for server to start

    yield mcp

    # Clean up
    server_task.cancel()
    await asyncio.gather(server_task, return_exceptions=True)

# Test using socket client
@pytest.mark.asyncio
async def test_socket_client(test_server):
    async with socket_client({"host": "localhost", "port": 8765}) as (read, write):
        async with ClientSession(read, write) as session:
            # Initialize
            await session.initialize()

            # Test resource
            content, mime_type = await session.read_resource("test://data")
            assert content == "test data"

            # Test tool
            result = await session.call_tool("test_tool", arguments={"value": "test"})
            assert result["result"] == "processed: test"

# Test using stdio client
@pytest.mark.asyncio
async def test_stdio_client():
    # Create a simple test script
    with open("test_script.py", "w") as f:
        f.write("""
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Test Server")

@mcp.resource("test://data")
def test_resource() -> str:
    return "test data"

if __name__ == "__main__":
    mcp.run(stdio=True)
""")

    # Connect using stdio
    server_params = StdioServerParameters(
        command="python",
        args=["test_script.py"],
    )

    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()
            content, mime_type = await session.read_resource("test://data")
            assert content == "test data"