Cuetopia Technical Reference
This document provides detailed technical information about the Cuetopia monitoring system’s internal architecture, data processing, and RPC communication patterns.
Architecture Overview
Cuetopia consists of three main plugins that communicate through Qt signals and share data via RPC calls to the Cuebot server.
┌─────────────────────────────────────────────────────────┐
│ CueGUI Application │
├─────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Monitor Jobs │ │ Job Details │ │ Job Graph │ │
│ │ Plugin │←→│ Plugin │←→│ Plugin │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ↓ ↓ ↓ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ RPC Communication Layer │ │
│ └──────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────┘
↓
┌──────────┐
│ Cuebot │
│ Server │
└──────────┘
Plugin Components
Monitor Jobs Plugin (MonitorJobsPlugin.py
)
Class Structure
MonitorJobsDockWidget (Main container)
├── JobMonitorTree (Tree widget)
│ ├── JobWidgetItem (Individual job items)
│ ├── MenuActions (Context menus)
│ └── ItemDelegate (Custom rendering)
├── JobRegexLoadEditBox (Search field)
└── JobLoadFinishedCheckBox (Filter control)
Key Methods and Data Flow
- Job Loading Process
_regexLoadJobsHandle() -> opencue.api.getJobs() -> addJob() -> JobWidgetItem creation
- Update Cycle
Timer (10s) -> _update() -> updateItem() -> RPC calls -> UI refresh
- State Calculation
displayState(job): if job.data.state == FINISHED: return "Finished" if job.data.is_paused: return "Paused" if job.data.job_stats.dead_frames > 0: return "Failing" if all_pending_are_dependent: return "Dependency" return "In Progress"
Monitor Job Details Plugin (MonitorJobDetailsPlugin.py
)
Component Architecture
MonitorLayerFramesDockWidget
├── LayerMonitorTree
│ ├── LayerWidgetItem
│ └── Layer statistics processing
└── FrameMonitor
├── FrameMonitorTree
├── FrameRangeSelection
└── Filter controls
Frame Data Processing
- Frame Loading
setJob(job) -> job.getFrames(search) -> FrameWidgetItem creation
Search parameters:
- limit: 500 frames
- offset: page * 500
- status_filter: selected states
- layer_filter: selected layers
- Log Data Buffering
FrameLogDataBuffer:
- Caches last log line
- Calculates LLU (Last Log Update)
- 15-second cache timeout
- Thread pool for async reads
- ETA Calculation
```python
FrameEtaDataBuffer:
- Historical frame times
- Rolling average calculation
- Per-layer estimations
- Updates every 30 seconds ```
Job Graph Plugin (MonitorJobGraphPlugin.py
)
Graph Construction
createGraph():
for layer in job.getLayers():
node = CueLayerNode(layer)
graph.add_node(node)
for node in graph.nodes:
for depend in node.getWhatDependsOnThis():
child = graph.get_node(depend.dependErLayer())
child.set_input(node.output)
Node Updates
update() -> job.getLayers() -> node.setRpcObject(layer) -> visual refresh
Update frequency: 20 seconds
RPC Communication Patterns
Job Data Retrieval
Basic Job Information
# Single job fetch
job = opencue.api.findJob(job_name)
Returns: Job object with nested data structure
# Multiple jobs with pattern
jobs = opencue.api.getJobs(regex=["pattern"], include_finished=True)
Returns: List of Job objects
Job Data Structure
Job {
JobData data {
string name
string id
State state
bool is_paused
int64 start_time
int64 stop_time
bool has_comment
bool auto_eat
}
JobStats job_stats {
int32 total_frames
int32 succeeded_frames
int32 running_frames
int32 dead_frames
int32 eaten_frames
int32 waiting_frames
int32 depend_frames
int64 max_rss
}
}
Layer Data Access
Layer Fetching
layers = job.getLayers()
Returns: List of Layer objects with statistics
Layer Data Structure
Layer {
LayerData data {
string name
string range
int32 chunk_size
int32 dispatch_order
repeated string services
repeated string limits
float min_cores
int64 min_memory
int32 min_gpus
int64 min_gpu_memory
repeated string tags
int32 timeout
int32 timeout_llu
}
LayerStats layer_stats {
int32 total_frames
int32 succeeded_frames
int32 running_frames
int32 dead_frames
int32 eaten_frames
int32 waiting_frames
int32 depend_frames
int64 max_rss
float avg_frame_sec
}
}
Frame Data Operations
Frame Search
frames = job.getFrames(
status=["RUNNING", "DEAD"], # Status filter
layer=["layer_name"], # Layer filter
offset=0, # Pagination offset
limit=500 # Max results
)
Frame Data Fields
Frame {
FrameData data {
int32 number
string layer_name
int32 dispatch_order
State state
string frame_state_display_override
int32 retry_count
string last_resource
int64 start_time
int64 stop_time
int64 used_memory
int64 max_rss
int64 used_gpu_memory
int64 max_gpu_memory
CheckpointState checkpoint_state
int32 checkpoint_count
}
}
Data Update Strategies
Pull-Based Updates
- Timer-Driven Refresh
QTimer -> timeout signal -> _update() method -> RPC calls
Intervals:
- Job Monitor: 10 seconds
- Frame Monitor: On demand
- Job Graph: 20 seconds
- Event-Driven Updates
User action -> Signal emission -> Connected slots -> RPC refresh
Examples:
- Job selection -> view_object signal
- Filter change -> immediate refresh
- Manual refresh -> F5 key handler
Optimization Techniques
Batch Operations
# Instead of individual calls
for job_id in job_ids:
job = opencue.api.getJob(job_id)
# Use batch retrieval
jobs = opencue.api.getJobs(id=job_ids)
Differential Updates
# Only update changed fields
if new_data.job_stats != cached_data.job_stats:
update_statistics_display()
Lazy Loading
# Load only visible items
visible_range = tree.visibleRange()
for item in visible_range:
item.updateData()
Memory Management
Object Lifecycle
- Job Proxy Storage
class JobMonitorTree:
def __init__(self):
self.__jobTimeLoaded = {} # Job ID → timestamp
self.__jobs = {} # Job ID → Job proxy
def addJob(self, job_id, timestamp):
if self.isExpired(timestamp):
return # Don't load expired jobs
self.__jobs[job_id] = opencue.api.getJob(job_id)
- Weak References
# Prevent circular references
self.parent = weakref.proxy(parent)
- Cache Expiration
JOB_RESTORE_THRESHOLD_DAYS = 3
JOB_RESTORE_THRESHOLD_LIMIT = 200
def restoreJobs(self, job_list):
today = datetime.now()
for job_id, timestamp in job_list[:LIMIT]:
if (today - timestamp).days <= THRESHOLD_DAYS:
self.addJob(job_id)
Signal and Slot Connections
Core Signals
# Job selection signal
view_object = QtCore.Signal(object)
# Job removal signal
unmonitor = QtCore.Signal(object)
# Facility change signal
facility_changed = QtCore.Signal()
# Layer filter signal
handle_filter_layers_byLayer = QtCore.Signal(list)
# Job changed signal
job_changed = QtCore.Signal()
Signal Flow Example
# User double-clicks job
JobMonitorTree.itemDoubleClicked ->
emit view_object(job) ->
MonitorJobDetails.__setJob(job) ->
LayerMonitorTree.setJob(job)
FrameMonitor.setJob(job)
JobGraph.setJob(job) ->
createGraph()
Performance Metrics
RPC Call Frequency
Operation | Frequency | Data Volume |
---|---|---|
Job list refresh | 10 seconds | ~50-200 jobs |
Layer fetch | On job selection | ~5-50 layers |
Frame fetch | On demand | 500 frames max |
Log read | On frame click | Single file |
Graph update | 20 seconds | Layer count |
Memory Usage
Component | Typical Usage | Maximum |
---|---|---|
Job cache | 10-50 MB | 200 MB |
Frame data | 5-20 MB | 100 MB |
Log buffer | 1-5 MB | 20 MB |
Graph nodes | 1-10 MB | 50 MB |
Configuration Files
Settings Storage
Location: ~/.config/opencue/cuegui.ini
[MonitorJobs]
columnWidths = 470,20,20,80,90,60,50,50,60,55,50,100,100,0
columnVisibility = true,true,true,true,true,true,true,true,true,true,true,true,true,true
jobs = Job.id1:timestamp,Job.id2:timestamp
loadFinished = false
grpDependentCb = true
autoLoadMineCb = true
[MonitorJobDetails]
splitterSize = 300,400
frameColumnWidths = 60,70,250,100,55,55,120,55,20,55,70,70,60,60,70,100,100,0
layerColumnWidths = 0,250,100,100,150,45,60,45,40,60,40,40,40,53,40,40,40,65,100,100,45,45
Error Handling
RPC Error Recovery
def safeRpcCall(func, *args, **kwargs):
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except opencue.EntityNotFoundException:
logger.warning(f"Entity not found: {args}")
return None
except opencue.CueException as e:
if attempt < max_retries - 1:
time.sleep(retry_delay * (2 ** attempt))
continue
logger.error(f"RPC failed: {e}")
raise
Data Validation
def validateJobName(name):
# Pattern: show-shot-user_descriptor
pattern = r'^[a-z0-9_]+-[a-z0-9\.]+-[a-z0-9_]+.*$'
return re.match(pattern, name, re.IGNORECASE)
def validateFrameRange(range_str):
try:
FileSequence.FrameSet(range_str)
return True
except:
return False
Extension Points
Custom Column Addition
# In JobMonitorTree.__init__()
self.addColumn("CustomField", 100, id=50,
data=lambda job: job.customData(),
sort=lambda job: job.customSort(),
tip="Custom field tooltip"
)
Custom Delegates
class CustomProgressDelegate(QtWidgets.QStyledItemDelegate):
def paint(self, painter, option, index):
# Custom rendering logic
progress = index.data()
# Draw custom progress visualization
Plugin Hooks
# Register custom action
menuActions.addAction("Custom Action", self.customHandler)
# Add to context menu
def customHandler(self, rpcObjects):
for obj in rpcObjects:
# Perform custom operation
pass
Debugging and Profiling
Enable Debug Logging
# In cuegui/Logger.py
logging.basicConfig(level=logging.DEBUG)
RPC Call Tracing
# Enable RPC debug mode
os.environ['OPENCUE_DEBUG_RPC'] = '1'