Remote GraphQL#
graphql-api provides powerful capabilities for working with remote GraphQL services, enabling you to create distributed GraphQL architectures, proxy remote APIs, and build sophisticated microservices integrations.
Overview#
Remote GraphQL features allow you to:
- Connect to external GraphQL APIs
- Integrate remote services into your local schema
- Build API gateways and schema federation
- Create distributed GraphQL architectures
- Switch between local and remote implementations
GraphQLRemoteExecutor#
The GraphQLRemoteExecutor class connects to and executes queries against remote GraphQL services:
Basic Usage#
from graphql_api.remote import GraphQLRemoteExecutor
# Connect to a remote GraphQL API
remote_api = GraphQLRemoteExecutor(
url="https://api.example.com/graphql",
http_method="POST", # or "GET"
verify=True, # SSL certificate verification
http_headers={
"Authorization": "Bearer your-token",
"Content-Type": "application/json"
}
)
# Execute queries directly
result = remote_api.execute('''
query {
user(id: "123") {
name
email
posts {
title
content
}
}
}
''')
print(result.data) # Access query results
if result.errors:
print("Errors:", result.errors)Configuration Options#
remote_api = GraphQLRemoteExecutor(
url="https://api.example.com/graphql",
http_method="POST", # HTTP method: "GET" or "POST"
verify=True, # SSL certificate verification
timeout=30, # Request timeout in seconds
http_headers={ # Custom headers
"Authorization": "Bearer token",
"User-Agent": "MyApp/1.0",
"X-API-Key": "api-key"
}
)Integrating Remote APIs#
Basic Integration#
Integrate remote APIs as fields in your local GraphQL schema:
from graphql_api.api import GraphQLAPI
from graphql_api.remote import GraphQLRemoteExecutor, remote_execute
from graphql_api.context import GraphQLContext
api = GraphQLAPI()
# External API connection
external_api = GraphQLRemoteExecutor(
url="https://api.external-service.com/graphql",
http_headers={"API-Key": "your-api-key"}
)
@api.type(is_root_type=True)
class Root:
@api.field
def external_data(self, context: GraphQLContext) -> external_api: # type: ignore[valid-type]
"""
Forward queries to external API.
The remote_execute helper automatically forwards the current query
to the remote service, maintaining field selection and arguments.
"""
return remote_execute(executor=external_api, context=context)
@api.field
def custom_external_query(self) -> dict:
"""Execute a specific query against the remote API."""
result = external_api.execute('''
query {
specificData {
id
value
metadata {
created_at
updated_at
}
}
}
''')
return result.data if result.data else {}Advanced Integration Patterns#
@api.type(is_root_type=True)
class Root:
@api.field
def user_profile(self, user_id: str) -> dict:
"""Combine local and remote data."""
# Get local user data
local_user = get_local_user(user_id)
# Get remote profile data
remote_result = external_api.execute(f'''
query {{
userProfile(id: "{user_id}") {{
preferences
settings
externalData
}}
}}
''')
# Combine data
return {
"id": local_user.id,
"name": local_user.name,
"email": local_user.email,
"profile": remote_result.data.get("userProfile", {})
}
@api.field
def search_combined(self, query: str) -> dict:
"""Search across multiple remote services."""
# Search users service
users_result = user_service.execute(f'''
query {{
searchUsers(query: "{query}") {{
id
name
type
}}
}}
''')
# Search content service
content_result = content_service.execute(f'''
query {{
searchContent(query: "{query}") {{
id
title
type
}}
}}
''')
return {
"users": users_result.data.get("searchUsers", []),
"content": content_result.data.get("searchContent", [])
}Async Remote Execution#
All remote operations support async execution for better performance:
import asyncio
from graphql_api.remote import GraphQLRemoteExecutor
async def fetch_remote_data():
remote_api = GraphQLRemoteExecutor(
url="https://api.example.com/graphql",
http_headers={"Authorization": "Bearer token"}
)
# Async execution
result = await remote_api.execute_async('''
query {
users(first: 10) {
id
name
email
}
}
''')
return result.data
# Async field resolver
@api.type(is_root_type=True)
class Root:
@api.field
async def async_external_data(self) -> dict:
"""Async remote data fetching."""
remote_api = GraphQLRemoteExecutor(
url="https://api.external.com/graphql"
)
result = await remote_api.execute_async('''
query {
dashboard {
metrics
alerts
status
}
}
''')
return result.data.get("dashboard", {})
@api.field
async def parallel_remote_calls(self) -> dict:
"""Make multiple remote calls in parallel."""
# Create multiple remote executors
service_a = GraphQLRemoteExecutor(url="https://service-a.com/graphql")
service_b = GraphQLRemoteExecutor(url="https://service-b.com/graphql")
service_c = GraphQLRemoteExecutor(url="https://service-c.com/graphql")
# Execute in parallel
results = await asyncio.gather(
service_a.execute_async('query { dataA { value } }'),
service_b.execute_async('query { dataB { value } }'),
service_c.execute_async('query { dataC { value } }')
)
return {
"serviceA": results[0].data,
"serviceB": results[1].data,
"serviceC": results[2].data
}
# Usage
data = asyncio.run(fetch_remote_data())Federation and Schema Stitching#
Combine multiple remote GraphQL APIs into a unified schema:
api = GraphQLAPI()
# Multiple remote services
user_service = GraphQLRemoteExecutor(
url="https://users.example.com/graphql",
http_headers={"Service": "user-service"}
)
order_service = GraphQLRemoteExecutor(
url="https://orders.example.com/graphql",
http_headers={"Service": "order-service"}
)
product_service = GraphQLRemoteExecutor(
url="https://products.example.com/graphql",
http_headers={"Service": "product-service"}
)
@api.type(is_root_type=True)
class Root:
@api.field
def users(self, context: GraphQLContext) -> user_service: # type: ignore[valid-type]
"""Forward user queries to user service."""
return remote_execute(executor=user_service, context=context)
@api.field
def orders(self, context: GraphQLContext) -> order_service: # type: ignore[valid-type]
"""Forward order queries to order service."""
return remote_execute(executor=order_service, context=context)
@api.field
def products(self, context: GraphQLContext) -> product_service: # type: ignore[valid-type]
"""Forward product queries to product service."""
return remote_execute(executor=product_service, context=context)
@api.field
def user_with_orders(self, user_id: str) -> dict:
"""Combine data from multiple services."""
# Get user data
user_result = user_service.execute(f'''
query {{
user(id: "{user_id}") {{
id
name
email
}}
}}
''')
# Get user's orders
orders_result = order_service.execute(f'''
query {{
orders(userId: "{user_id}") {{
id
total
status
items {{
productId
quantity
price
}}
}}
}}
''')
return {
"user": user_result.data.get("user"),
"orders": orders_result.data.get("orders", [])
}
@api.field
async def enhanced_user_profile(self, user_id: str) -> dict:
"""Combine data from all services asynchronously."""
# Execute queries in parallel
user_task = user_service.execute_async(f'''
query {{ user(id: "{user_id}") {{ id name email }} }}
''')
orders_task = order_service.execute_async(f'''
query {{ orders(userId: "{user_id}") {{ id total status }} }}
''')
# Wait for all results
user_result, orders_result = await asyncio.gather(user_task, orders_task)
# Get product details for orders
order_items = []
if orders_result.data and orders_result.data.get("orders"):
product_ids = []
for order in orders_result.data["orders"]:
for item in order.get("items", []):
product_ids.append(item["productId"])
if product_ids:
products_result = await product_service.execute_async(f'''
query {{
products(ids: {product_ids}) {{
id
name
price
}}
}}
''')
order_items = products_result.data.get("products", [])
return {
"user": user_result.data.get("user"),
"orders": orders_result.data.get("orders", []),
"products": order_items
}GraphQLRemoteObject#
Make local objects behave like remote GraphQL queries for testing or abstraction:
from graphql_api.remote import GraphQLRemoteObject
api = GraphQLAPI()
@api.type(is_root_type=True)
class House:
@api.field
def number_of_doors(self) -> int:
return 5
@api.field
def address(self) -> str:
return "123 Main St"
@api.field
def rooms(self) -> List[str]:
return ["kitchen", "bedroom", "bathroom"]
# Create a remote-like object that queries the local API
house: House = GraphQLRemoteObject(executor=api.executor(), api=api)
# Use like a regular object, but it executes GraphQL queries behind the scenes
doors = house.number_of_doors() # Executes: query { numberOfDoors }
address = house.address() # Executes: query { address }
rooms = house.rooms() # Executes: query { rooms }
assert doors == 5
assert address == "123 Main St"
assert "kitchen" in roomsUse cases for GraphQLRemoteObject:
- Testing with mock remote services
- Abstracting local vs remote data sources
- Development environments with local fallbacks
- Service migration scenarios
Error Handling#
Remote GraphQL operations handle errors gracefully:
def handle_remote_errors():
try:
result = remote_api.execute('''
query {
user(id: "invalid") {
name
}
}
''')
if result.errors:
print("GraphQL errors:", result.errors)
# Handle GraphQL-level errors
if result.data:
print("Data:", result.data)
except Exception as e:
print("Network or execution error:", e)
# Handle network errors, timeouts, etc.
# Error handling in resolvers
@api.type(is_root_type=True)
class Root:
@api.field
def safe_remote_data(self) -> Optional[dict]:
"""Safe remote data fetching with error handling."""
try:
result = external_api.execute('''
query {
criticalData {
value
}
}
''')
if result.errors:
# Log errors but don't fail
print(f"Remote API errors: {result.errors}")
return None
return result.data
except Exception as e:
# Network error - return None instead of failing
print(f"Failed to fetch remote data: {e}")
return NoneBest Practices#
Use connection pooling for performance:
# Reuse the same executor instance
class RemoteServices:
def __init__(self):
self.user_service = GraphQLRemoteExecutor(
url="https://users.example.com/graphql"
)
self.order_service = GraphQLRemoteExecutor(
url="https://orders.example.com/graphql"
)
# Create once, use many times
services = RemoteServices()Handle authentication properly:
def get_remote_executor_with_auth(context: GraphQLContext):
"""Create authenticated remote executor."""
user = getattr(context, 'current_user', None)
headers = {"Content-Type": "application/json"}
if user and user.api_token:
headers["Authorization"] = f"Bearer {user.api_token}"
return GraphQLRemoteExecutor(
url="https://api.example.com/graphql",
http_headers=headers
)
@api.field
def authenticated_remote_data(self, context: GraphQLContext) -> dict:
remote_api = get_remote_executor_with_auth(context)
result = remote_api.execute('query { privateData { value } }')
return result.dataImplement caching for remote calls:
import time
from typing import Dict, Any
# Simple cache implementation
remote_cache: Dict[str, tuple] = {}
CACHE_TTL = 300 # 5 minutes
def cached_remote_execute(executor, query: str) -> Any:
"""Execute remote query with caching."""
cache_key = f"{executor.url}:{hash(query)}"
current_time = time.time()
# Check cache
if cache_key in remote_cache:
cached_result, cached_time = remote_cache[cache_key]
if current_time - cached_time < CACHE_TTL:
return cached_result
# Execute and cache
result = executor.execute(query)
remote_cache[cache_key] = (result, current_time)
return resultMonitor remote service health:
@api.field
def service_health(self) -> dict:
"""Check health of remote services."""
services = {
"users": user_service,
"orders": order_service,
"products": product_service
}
health_status = {}
for name, service in services.items():
try:
result = service.execute('query { __schema { queryType { name } } }')
health_status[name] = "healthy" if not result.errors else "degraded"
except Exception:
health_status[name] = "unhealthy"
return health_statusRemote GraphQL capabilities enable building sophisticated distributed architectures while maintaining type safety and developer experience. They’re essential for microservices, API gateways, and modern distributed systems.