Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/dvlpjrs/guMCP/llms.txt

Use this file to discover all available pages before exploring further.

MCP supports multiple transport mechanisms for communication between clients and servers. guMCP implements two primary transports: stdio (standard input/output) and SSE (Server-Sent Events).

Transport Overview

TransportUse CaseConnectionState
stdioLocal development, CLI toolsProcess-basedStateful
SSERemote servers, web appsHTTP-basedStateful with reconnection

Stdio Transport

Stdio transport uses standard input and output streams for communication. It’s the simplest transport and works by:
  1. Launching the MCP server as a child process
  2. Sending JSON-RPC messages through stdin
  3. Receiving responses through stdout
  4. Using stderr for logging (not protocol messages)

Implementation

guMCP’s stdio server implementation is in src/servers/local.py:
import mcp.server.stdio

async def run_stdio_server(server, get_initialization_options):
    """Run the server using stdin/stdout streams"""
    logger.info("Starting stdio server")
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            get_initialization_options(),
        )

Running a Stdio Server

To launch a guMCP server with stdio transport:
python -m src.servers.local \
  --server slack \
  --user-id local
This:
  • Loads the specified server module
  • Creates a server instance with the user ID
  • Starts the stdio transport
  • Waits for client connections via stdin/stdout
Stdio transport is process-based: each client connection requires launching a new server process. When the client disconnects, the process terminates.

Server Discovery

The stdio runner dynamically loads servers from the src/servers/ directory:
async def load_server(server_name):
    """Load a server module by name"""
    servers_dir = Path(__file__).parent.absolute()
    server_dir = servers_dir / server_name
    server_file = server_dir / "main.py"
    
    if not server_file.exists():
        logger.error(f"Server '{server_name}' not found at {server_file}")
        sys.exit(1)
    
    # Dynamically import the server module
    spec = importlib.util.spec_from_file_location(
        f"{server_name}.server", 
        server_file
    )
    server_module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(server_module)
    
    return server_module.server, server_module.get_initialization_options

Use Cases for Stdio

  • Local development: Fast iteration and debugging
  • CLI tools: Direct integration with command-line applications
  • Desktop AI assistants: Claude Desktop, Cursor, etc.
  • Single-user environments: Personal productivity tools
Use stdio transport during development for easier debugging. You can see logs in stderr while the protocol messages flow through stdin/stdout.

SSE Transport

Server-Sent Events (SSE) transport uses HTTP for communication, enabling:
  • Remote server deployment
  • Multiple concurrent clients
  • Browser-based clients
  • Scalable architecture

Implementation

guMCP’s SSE server implementation is in src/servers/remote.py:
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route

# Create SSE transport for each user session
sse_transport = SseServerTransport(
    f"/{server_name}/{session_key_encoded}/messages/"
)

# Handle SSE connection
async with sse_transport.connect_sse(
    request.scope, request.receive, request._send
) as streams:
    await server_instance.run(
        streams[0],  # read stream
        streams[1],  # write stream
        init_options,
    )

Architecture

SSE transport in guMCP uses a two-endpoint architecture:
  1. SSE Endpoint: Long-lived connection for server-to-client messages
    • GET /{server_name}/{session_key}
    • Keeps connection open
    • Streams server responses
  2. Message Endpoint: Client-to-server requests
    • POST /{server_name}/{session_key}/messages/
    • Accepts JSON-RPC messages
    • Returns immediately
# SSE connection route
routes.append(
    Route(f"/{server_name}/{{session_key}}", endpoint=handle_sse)
)

# Message posting route
routes.append(
    Route(
        f"/{server_name}/{{session_key}}/messages/",
        endpoint=handle_message,
        methods=["POST"],
    )
)

Session Management

SSE transport supports stateful sessions with automatic reconnection:
# Store transports and server instances per session
user_session_transports = {}
user_server_instances = {}

# Create or reuse server instance
if session_key not in user_server_instances:
    server_instance = server_factory(user_id, api_key)
    user_server_instances[session_key] = server_instance
else:
    server_instance = user_server_instances[session_key]

# Store transport for this session
user_session_transports[session_key] = sse_transport
Server instances persist across reconnections, maintaining state and credentials. This enables resuming work after network interruptions.

Session Keys

Session keys in guMCP identify unique user sessions:
# Session key format: server_name:user_id:api_key
session_key_encoded = request.path_params["session_key"]
session_key = f"{server_name}:{session_key_encoded}"

# Parse user credentials from session key
if ":" in session_key_encoded:
    user_id = session_key_encoded.split(":")[0]
    api_key = session_key_encoded.split(":")[1]
else:
    user_id = session_key_encoded

Connection Lifecycle

  1. Connect: Client opens SSE connection with session key
  2. Initialize: Server and client exchange capabilities
  3. Active: Client sends requests via POST, receives responses via SSE
  4. Disconnect: Connection closes, server instance persists
  5. Reconnect: Client can reconnect with same session key
try:
    # Increment metrics
    active_connections.labels(server=server_name).inc()
    connection_total.labels(server=server_name).inc()
    
    async with sse_transport.connect_sse(
        request.scope, request.receive, request._send
    ) as streams:
        logger.info(f"SSE connection established for {server_name}")
        await server_instance.run(streams[0], streams[1], init_options)
finally:
    # Clean up transport on disconnect
    if session_key in user_session_transports:
        del user_session_transports[session_key]
        active_connections.labels(server=server_name).dec()
        logger.info(f"Closed SSE connection for {server_name}")

Running an SSE Server

To start the guMCP SSE server:
python -m src.servers.remote \
  --host 0.0.0.0 \
  --port 8000
This:
  • Discovers all available servers in src/servers/
  • Creates routes for each server
  • Starts HTTP server on specified host/port
  • Handles multiple concurrent user sessions

Server Discovery

The SSE server automatically discovers and loads all servers:
def discover_servers():
    """Discover and load all servers from the servers directory"""
    servers_dir = Path(__file__).parent.absolute()
    
    for item in servers_dir.iterdir():
        if item.is_dir():
            server_file = item / "main.py"
            if server_file.exists():
                # Load server module
                spec = importlib.util.spec_from_file_location(
                    f"{item.name}.server", 
                    server_file
                )
                server_module = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(server_module)
                
                # Store server factory and initialization
                servers[item.name] = {
                    "server": server_module.server,
                    "get_initialization_options": 
                        server_module.get_initialization_options,
                }

Monitoring and Metrics

guMCP’s SSE server exposes Prometheus metrics:
from prometheus_client import Counter, Gauge, generate_latest

# Define metrics
active_connections = Gauge(
    "gumcp_active_connections",
    "Number of active SSE connections",
    ["server"]
)

connection_total = Counter(
    "gumcp_connection_total",
    "Total number of SSE connections",
    ["server"]
)

# Metrics endpoint (runs on port 9091 by default)
@app.route("/metrics")
async def metrics_endpoint(request):
    return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
Access metrics at http://localhost:9091/metrics to monitor:
  • Active connections per server
  • Total connection count
  • Connection lifecycle events

Use Cases for SSE

  • Production deployments: Scalable server infrastructure
  • Multi-user applications: SaaS platforms and team tools
  • Web-based clients: Browser applications and dashboards
  • Remote access: Connect to servers across networks
SSE transport is ideal for production deployments where you need to serve multiple users simultaneously with persistent state.

Choosing a Transport

Use Stdio When:

  • Developing and testing locally
  • Building single-user desktop applications
  • Integrating with CLI tools
  • Simplicity is more important than scalability

Use SSE When:

  • Deploying to production servers
  • Supporting multiple concurrent users
  • Building web-based applications
  • Requiring session persistence and reconnection
  • Needing monitoring and metrics

Next Steps

  • Learn about Authentication for securing SSE connections
  • Explore the MCP Protocol for understanding the message format
  • See server examples in src/servers/ for implementation patterns