Events
Nitro's event system decouples business logic from side effects. When an entity does something (places an order, completes a task), it announces "something happened" without knowing who's listening. Handlers respond independently — sending emails, updating analytics, triggering webhooks — without polluting entity code.
Built on Blinker with added async support, priority execution, conditional handlers, and cancellation.
Defining Events
from nitro import event
# Create or retrieve a named event (same name = same instance)
todo_created = event('todo-created')
# With documentation
todo_completed = event('todo-completed', doc="Fired when a todo is marked complete")
Use dot notation (todo.created) or hyphens (todo-created) — both work. Be descriptive: todo-created is better than done.
Connecting Handlers
Use the @on decorator to register handler functions:
from nitro import event, on
todo_created = event('todo-created')
@on('todo-created')
def log_creation(sender, **kwargs):
print(f"New todo: {sender.title}")
@on('todo-created')
async def notify_user(sender, **kwargs):
await NotificationService.send(sender.user_email, f"New todo: {sender.title}")
Handlers must accept sender and **kwargs. The sender is whatever object emitted the event (usually an entity instance). Extra data passed during emission arrives in kwargs. Pass the event name as a string to @on().
Handler Types
The event system auto-detects four handler types:
from nitro import on
# Sync function
@on('todo-completed')
def log_todo(sender, **kwargs):
print(f"Todo {sender.id} completed")
return "logged"
# Async function — runs without blocking
@on('todo-completed')
async def send_email(sender, **kwargs):
await email_service.send(sender.user_email, "Todo completed!")
return "email sent"
# Sync generator — yields multiple results
@on('todo-completed')
def process_related(sender, **kwargs):
for related in sender.related_todos:
yield process(related)
# Async generator
@on('todo-completed')
async def stream_updates(sender, **kwargs):
async for update in async_process_todo(sender):
yield update
Emitting Events
emit() — Synchronous
Calls sync handlers immediately. Async handlers are scheduled (fire-and-forget).
from nitro import Entity, event, emit
todo_completed = event('todo-completed')
class Todo(Entity, table=True):
title: str
completed: bool = False
def complete(self):
self.completed = True
self.save()
emit(todo_completed, self)
def complete_with_data(self):
self.completed = True
self.save()
# Pass extra data to handlers via kwargs
emit(todo_completed, self, points=10, reason="manual")
Handlers receive extra kwargs:
from nitro import on
@on('todo-completed')
def apply_rewards(sender, points=0, **kwargs):
print(f"Awarded {points} points for completing {sender.title}")
emit_async() — Await All Handlers
Runs all handlers in parallel and waits for results. Use in async code when you need confirmation that all handlers completed.
from nitro import Entity, event, emit_async, on
todo_completed = event('todo-completed')
@on('todo-completed')
async def send_email(sender, **kwargs):
await email_service.send(sender.user_email, "Done!")
return "email sent"
@on('todo-completed')
async def update_stats(sender, **kwargs):
await stats_service.increment("completed")
return "stats updated"
class Todo(Entity, table=True):
title: str
completed: bool = False
async def complete(self):
self.completed = True
self.save()
results = await emit_async(todo_completed, self)
# results: [["email sent"], ["stats updated"]]
emit() | emit_async() | |
|---|---|---|
| Async handlers | Scheduled (fire-and-forget) | Awaited in parallel |
| Sync handlers | Executed immediately | Executed immediately |
| Returns | Sync results only | All results |
| Use in | Sync or async code | Async code only |
Priority and Conditions
Priority
Higher priority runs first. Default is 0.
from nitro import on
@on('todo-created', priority=10)
def validate(sender, **kwargs):
"""Runs first — validates data"""
if not sender.title:
return False # Cancels remaining handlers
@on('todo-created', priority=5)
def enrich(sender, **kwargs):
"""Runs second"""
if not sender.assignee:
sender.assignee = "unassigned"
sender.save()
@on('todo-created', priority=0)
def notify(sender, **kwargs):
"""Runs last (default priority)"""
print(f"New todo: {sender.title}")
When a handler returns False, all remaining handlers are cancelled.
Conditional Handlers
Only execute when a condition is met:
from nitro import on
@on('todo-created', condition=lambda sender, **kw: sender.priority >= 8)
def urgent_alert(sender, **kwargs):
"""Only fires for high-priority todos"""
print(f"URGENT: {sender.title}")
# Or use a named function for complex conditions
def is_overdue(sender, **kwargs):
return sender.due_date and sender.due_date < datetime.now()
@on('todo-updated', condition=is_overdue)
def overdue_warning(sender, **kwargs):
print(f"Overdue: {sender.title}")
Conditions must accept sender and **kwargs and return a boolean. Exceptions in conditions skip the handler silently.
When NOT to Use Events
Events are for side effects (notifications, logging, analytics). Don't use them for:
Request-response patterns — use direct function calls:
todo = Todo.get(id)Calculations that return values — use methods:
priority = PriorityService.calculate(todo)Core business logic — keep validation and state changes in entity methods
from nitro import Entity, event, emit
todo_completed = event('todo-completed')
class Todo(Entity, table=True):
title: str
completed: bool = False
def complete(self):
# Business logic stays in the entity
if not self.title:
raise ValueError("Cannot complete a todo without a title")
self.completed = True
self.save()
# Only side effects go through events
emit(todo_completed, self)