Pagination and Relay#
graphql-api provides comprehensive support for Relay-style cursor-based pagination, which is the GraphQL standard for efficiently handling large datasets. This includes the Node interface for global object identification and Connection-based pagination.
Understanding Relay Pagination#
Relay pagination uses a cursor-based approach that provides:
- Stable pagination: Results remain consistent even when data changes
- Efficient traversal: Forward and backward navigation through large datasets
- Rich metadata: Information about pagination state and boundaries
- GraphQL standard: Compatible with Relay, Apollo, and other GraphQL clients
Core Relay Concepts#
Node Interface#
The Node interface provides globally unique IDs for objects:
from graphql_api.relay import Node
@api.type
class User(Node):
def __init__(self, id: str, name: str, email: str):
self._id = id
self._name = name
self._email = email
# The `id` field is automatically provided by the Node interface
@api.field
def name(self) -> str:
return self._name
@api.field
def email(self) -> str:
return self._email
@classmethod
def get_node(cls, info, id):
"""
This method tells Relay how to fetch a User by its global ID.
In a real app, you would fetch the user from a database.
"""
# Example implementation
user_data = get_user_from_database(id)
if user_data:
return User(id=user_data.id, name=user_data.name, email=user_data.email)
return None
@api.type
class Post(Node):
def __init__(self, id: str, title: str, content: str, author_id: str):
self._id = id
self._title = title
self._content = content
self._author_id = author_id
@api.field
def title(self) -> str:
return self._title
@api.field
def content(self) -> str:
return self._content
@api.field
def author(self) -> User:
return User.get_node(None, self._author_id)
@classmethod
def get_node(cls, info, id):
post_data = get_post_from_database(id)
if post_data:
return Post(
id=post_data.id,
title=post_data.title,
content=post_data.content,
author_id=post_data.author_id
)
return NoneConnection, Edge, and PageInfo#
Relay pagination uses these key components:
from graphql_api.relay import Connection, Edge, PageInfo
from typing import List, Optional
import collections
class PersonConnection(Connection):
def __init__(self, people, *args, **kwargs):
super().__init__(*args, **kwargs)
# Convert data to cursor-based format
cursors = list(people.keys())
start_index = 0
end_index = len(cursors) - 1
self.has_previous_page = False
self.has_next_page = False
self.filtered_cursors = []
# Handle 'after' cursor - start after this cursor
if self._after is not None:
try:
start_index = cursors.index(self._after) + 1
if start_index > 0:
self.has_previous_page = True
except ValueError:
# Invalid cursor - start from beginning
start_index = 0
# Handle 'before' cursor - end before this cursor
if self._before is not None:
try:
end_index = cursors.index(self._before) - 1
if end_index < len(cursors) - 1:
self.has_next_page = True
except ValueError:
# Invalid cursor - go to end
end_index = len(cursors) - 1
# Apply cursor filtering
self.filtered_cursors = cursors[start_index:end_index + 1]
self.people = people
# Handle 'first' pagination - limit from start
if self._first is not None:
if len(self.filtered_cursors) > self._first:
self.filtered_cursors = self.filtered_cursors[:self._first]
self.has_next_page = True
# Handle 'last' pagination - limit from end
elif self._last is not None:
if len(self.filtered_cursors) > self._last:
self.filtered_cursors = self.filtered_cursors[-self._last:]
self.has_previous_page = True
@api.field
def edges(self) -> List[Edge]:
"""Return edges containing cursor and node for each item."""
return [
Edge(cursor=cursor, node=self.people[cursor])
for cursor in self.filtered_cursors
]
@api.field
def page_info(self) -> PageInfo:
"""Return pagination metadata."""
if not self.filtered_cursors:
return PageInfo(
start_cursor=None,
end_cursor=None,
has_previous_page=False,
has_next_page=False,
count=0
)
return PageInfo(
start_cursor=self.filtered_cursors[0],
end_cursor=self.filtered_cursors[-1],
has_previous_page=self.has_previous_page,
has_next_page=self.has_next_page,
count=len(self.filtered_cursors),
)Basic Pagination Implementation#
Here’s a complete example of Relay pagination:
from graphql_api.relay import Node, Connection, Edge, PageInfo
from typing import List, Optional
import collections
class Person(Node):
def __init__(self, name: Optional[str] = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self._name = name
@property
@api.field
def name(self) -> Optional[str]:
return self._name
@classmethod
def get_node(cls, info, id):
# In real implementation, fetch from database
people_data = {
"person_1": Person(name="Alice"),
"person_2": Person(name="Bob"),
"person_3": Person(name="Charlie"),
"person_4": Person(name="Diana"),
"person_5": Person(name="Eve"),
}
return people_data.get(id)
@api.type(is_root_type=True)
class Root:
@api.field
def people(
self,
before: Optional[str] = None,
after: Optional[str] = None,
first: Optional[int] = None,
last: Optional[int] = None,
) -> PersonConnection:
"""
Paginated list of people using Relay connection pattern.
Args:
before: Cursor to paginate before
after: Cursor to paginate after
first: Number of items to fetch from start
last: Number of items to fetch from end
"""
# Your data source - in real app, this would be from database
people_data = collections.OrderedDict([
("person_1", Person(name="Alice")),
("person_2", Person(name="Bob")),
("person_3", Person(name="Charlie")),
("person_4", Person(name="Diana")),
("person_5", Person(name="Eve")),
])
return PersonConnection(
people_data,
before=before,
after=after,
first=first,
last=last
)
@api.field
def person(self, id: str) -> Optional[Person]:
"""Get a person by their global ID."""
return Person.get_node(None, id)Database Integration#
Integrate Relay pagination with database queries:
class UserConnection(Connection):
def __init__(self, query_result, total_count, *args, **kwargs):
super().__init__(*args, **kwargs)
self.users = query_result
self.total_count = total_count
# Calculate pagination metadata
self.has_previous_page = self._after is not None
self.has_next_page = len(self.users) == (self._first or self._last or 0)
@api.field
def edges(self) -> List[Edge]:
return [
Edge(cursor=f"user_{user.id}", node=user)
for user in self.users
]
@api.field
def page_info(self) -> PageInfo:
if not self.users:
return PageInfo(
start_cursor=None,
end_cursor=None,
has_previous_page=False,
has_next_page=False,
count=0
)
return PageInfo(
start_cursor=f"user_{self.users[0].id}",
end_cursor=f"user_{self.users[-1].id}",
has_previous_page=self.has_previous_page,
has_next_page=self.has_next_page,
count=len(self.users)
)
@api.field
def total_count(self) -> int:
"""Total number of items available (not just in current page)."""
return self.total_count
@api.type(is_root_type=True)
class Root:
@api.field
def users(
self,
first: Optional[int] = None,
after: Optional[str] = None,
last: Optional[int] = None,
before: Optional[str] = None,
search: Optional[str] = None
) -> UserConnection:
"""
Paginated user list with search capability.
"""
# Parse cursor to get offset
offset = 0
if after:
# Extract ID from cursor (e.g., "user_123" -> 123)
try:
offset = int(after.split('_')[1])
except (IndexError, ValueError):
offset = 0
# Build database query
limit = first or last or 20 # Default page size
if last:
# For 'last', we need to adjust the query
pass # Implement reverse pagination logic
# Execute database query
query = db.session.query(User)
if search:
query = query.filter(User.name.ilike(f'%{search}%'))
# Get total count for metadata
total_count = query.count()
# Apply pagination
if after:
query = query.filter(User.id > offset)
users = query.order_by(User.id).limit(limit).all()
return UserConnection(
query_result=users,
total_count=total_count,
first=first,
after=after,
last=last,
before=before
)Advanced Pagination Patterns#
Async Database Pagination#
class AsyncUserConnection(Connection):
def __init__(self, users, has_more, *args, **kwargs):
super().__init__(*args, **kwargs)
self.users = users
self.has_more = has_more
@api.field
def edges(self) -> List[Edge]:
return [
Edge(cursor=f"user_{user.id}", node=user)
for user in self.users
]
@api.field
def page_info(self) -> PageInfo:
if not self.users:
return PageInfo(
start_cursor=None,
end_cursor=None,
has_previous_page=False,
has_next_page=False,
count=0
)
return PageInfo(
start_cursor=f"user_{self.users[0].id}",
end_cursor=f"user_{self.users[-1].id}",
has_previous_page=self._after is not None,
has_next_page=self.has_more,
count=len(self.users)
)
@api.type(is_root_type=True)
class Root:
@api.field
async def async_users(
self,
first: Optional[int] = 20,
after: Optional[str] = None
) -> AsyncUserConnection:
"""Async pagination with database."""
offset = 0
if after:
try:
offset = int(after.split('_')[1])
except (IndexError, ValueError):
offset = 0
# Async database query
async with get_db_session() as session:
# Fetch one extra to check if there are more
limit = first + 1
users = await session.execute(
select(User)
.where(User.id > offset)
.order_by(User.id)
.limit(limit)
)
users = users.scalars().all()
# Check if there are more results
has_more = len(users) > first
if has_more:
users = users[:-1] # Remove the extra item
return AsyncUserConnection(
users=users,
has_more=has_more,
first=first,
after=after
)Filtering and Sorting with Pagination#
from enum import Enum
class SortOrder(Enum):
ASC = "asc"
DESC = "desc"
class UserSortField(Enum):
NAME = "name"
CREATED_AT = "created_at"
EMAIL = "email"
@api.type(is_root_type=True)
class Root:
@api.field
def filtered_users(
self,
first: Optional[int] = 20,
after: Optional[str] = None,
filter: Optional[str] = None,
sort_field: UserSortField = UserSortField.CREATED_AT,
sort_order: SortOrder = SortOrder.DESC
) -> UserConnection:
"""
Advanced pagination with filtering and sorting.
"""
# Build query with filters
query = db.session.query(User)
if filter:
query = query.filter(
db.or_(
User.name.ilike(f'%{filter}%'),
User.email.ilike(f'%{filter}%')
)
)
# Apply sorting
sort_column = getattr(User, sort_field.value)
if sort_order == SortOrder.DESC:
query = query.order_by(sort_column.desc())
else:
query = query.order_by(sort_column.asc())
# Apply cursor pagination
if after:
# Decode cursor and apply appropriate filter
cursor_data = decode_cursor(after)
query = apply_cursor_filter(query, cursor_data, sort_field, sort_order)
# Execute query
limit = first + 1 # Get one extra to check for more
users = query.limit(limit).all()
has_more = len(users) > first
if has_more:
users = users[:-1]
total_count = query.count()
return UserConnection(
query_result=users,
total_count=total_count,
first=first,
after=after
)Client Usage Examples#
Basic Pagination Queries#
# Get first 5 people
query {
people(first: 5) {
edges {
cursor
node {
id
name
}
}
pageInfo {
hasNextPage
hasPreviousPage
startCursor
endCursor
count
}
}
}
# Get next page after a cursor
query {
people(first: 5, after: "person_3") {
edges {
cursor
node {
id
name
}
}
pageInfo {
hasNextPage
endCursor
}
}
}
# Get last 3 people (reverse pagination)
query {
people(last: 3) {
edges {
cursor
node {
id
name
}
}
pageInfo {
hasPreviousPage
startCursor
}
}
}Advanced Queries with Filtering#
# Filtered and sorted pagination
query {
filteredUsers(
first: 10,
filter: "john",
sortField: NAME,
sortOrder: ASC
) {
edges {
node {
id
name
email
createdAt
}
}
pageInfo {
hasNextPage
endCursor
}
totalCount
}
}Best Practices#
Use stable cursors:
# ✅ Good: Use stable IDs as cursors
Edge(cursor=f"user_{user.id}", node=user)
# ❌ Avoid: Using array indices as cursors
Edge(cursor=str(index), node=user) # Unstable when data changesImplement efficient database queries:
# ✅ Good: Use database-level pagination
query.offset(offset).limit(limit)
# ❌ Avoid: Loading all data and paginating in memory
all_users = query.all()
paginated = all_users[offset:offset+limit] # Inefficient for large datasetsHandle edge cases gracefully:
def safe_cursor_decode(cursor: Optional[str]) -> int:
"""Safely decode cursor with fallback."""
if not cursor:
return 0
try:
return int(cursor.split('_')[1])
except (IndexError, ValueError):
return 0 # Fallback to beginningProvide helpful metadata:
@api.field
def enhanced_page_info(self) -> PageInfo:
return PageInfo(
start_cursor=self.start_cursor,
end_cursor=self.end_cursor,
has_previous_page=self.has_previous_page,
has_next_page=self.has_next_page,
count=len(self.current_page),
total_count=self.total_count, # Helpful for UI
page_number=self.calculate_page_number(), # Helpful for UI
)Relay pagination provides a robust, standardized approach to handling large datasets in GraphQL APIs, ensuring compatibility with modern GraphQL clients and excellent user experience.