Core Concepts¶
This page defines the fundamental concepts and data structures that form the foundation of OWASP Amass. Understanding these concepts is essential before diving into specific subsystems. For information about how these components are orchestrated together, see Architecture Overview. For details on how data flows through the system, see Data Flow and Processing Pipeline.
The core concepts covered here are: - Events: Units of work that flow through the system - Assets and Entities: Open Asset Model (OAM) assets wrapped in database entities - Sessions: Isolated enumeration contexts with their own configuration and state - Handlers: Callback functions registered by plugins to process specific asset types - Pipelines: Priority-ordered chains of handlers for each asset type - Transformations: Configuration rules that control which plugins process which assets
Events¶
Event Structure¶
An Event is the fundamental unit of work in Amass. Every discovery, enrichment, or processing action generates events that flow through the system. Events are defined in .
graph LR
subgraph "Event Structure"
Event["et.Event"]
Event --> Name["Name: string<br/>'FQDN - example.com'"]
Event --> Entity["Entity: *dbt.Entity<br/>(wraps OAM Asset)"]
Event --> Meta["Meta: interface{}<br/>(optional metadata)"]
Event --> Dispatcher["Dispatcher: et.Dispatcher"]
Event --> Session["Session: et.Session"]
end
subgraph "Event Processing Wrapper"
EDE["et.EventDataElement"]
EDE --> EventRef["Event: *et.Event"]
EDE --> Error["Error: error"]
EDE --> Queue["Queue: chan *EventDataElement"]
end
Event -.wrapped in.-> EDE
Event Fields:
| Field | Type | Purpose |
|---|---|---|
Name |
string |
Human-readable event identifier (e.g., "FQDN - example.com") |
Entity |
*dbt.Entity |
The asset being processed, wrapped in a database entity |
Meta |
interface{} |
Optional metadata (e.g., EmailMeta for verification status) |
Dispatcher |
et.Dispatcher |
Reference to the dispatcher for generating new events |
Session |
et.Session |
The session context this event belongs to |
The EventDataElement wraps an event for pipeline processing. It includes an Error field to accumulate errors from handlers and a Queue channel for completion callbacks.
Sources:
Event Lifecycle¶
Events are created when: 1. User submits initial seed assets via GraphQL API 2. Plugins discover new assets and dispatch new events 3. Session queue refills pipeline queues with pending work
graph TD
Create["Event Created"]
Create --> Validate["Validate Event<br/>dispatcher.safeDispatch()"]
Validate --> |"Has Entity?"| Check1{"Session Done?"}
Check1 --> |No| QueueCheck{"Already in<br/>Queue?"}
QueueCheck --> |No| AppendQueue["Append to Session Queue<br/>session.Queue().Append()"]
AppendQueue --> Stats["Increment WorkItemsTotal"]
Stats --> Dispatch["Append to Pipeline<br/>dispatcher.appendToPipeline()"]
Dispatch --> Pipeline["Asset Pipeline<br/>Priority 1-9 Handlers"]
Pipeline --> Handler1["Handler Execution"]
Handler1 --> |"May generate<br/>new events"| Create
Handler1 --> Complete["Completion Callback<br/>dispatcher.completedCallback()"]
Complete --> StatsUpdate["Increment WorkItemsCompleted"]
Check1 --> |Yes| Reject["Reject: Session Terminated"]
QueueCheck --> |Yes| Reject2["Skip: Duplicate"]
Sources: ,
Assets and Entities¶
Open Asset Model (OAM)¶
Amass uses the Open Asset Model (OAM) to represent discovered infrastructure. OAM defines standardized asset types with consistent properties and relationships. For comprehensive coverage of OAM, see Open Asset Model (OAM).
Core Asset Types:
graph TB
subgraph "Network Assets"
FQDN["oam.FQDN<br/>oamdns.FQDN{Name}"]
IP["oam.IPAddress<br/>oamnet.IPAddress{Address, Type}"]
Netblock["oam.Netblock<br/>oamnet.Netblock{CIDR, Type}"]
ASN["oam.AutonomousSystem<br/>oamnet.AutonomousSystem{Number}"]
end
subgraph "Organizational Assets"
Org["oam.Organization<br/>org.Organization{Name}"]
Contact["oam.ContactRecord<br/>contact.ContactRecord"]
Person["oam.Person<br/>people.Person{Name}"]
Location["oam.Location<br/>contact.Location{Address}"]
end
subgraph "Service Assets"
Service["oam.Service<br/>platform.Service{Port, Protocol}"]
TLS["oam.TLSCertificate<br/>oamcert.TLSCertificate"]
URL["oam.URL<br/>url.URL{Raw}"]
end
subgraph "Registration Assets"
Domain["oam.DomainRecord<br/>oamreg.DomainRecord"]
IPNet["oam.IPNetRecord<br/>oamreg.IPNetRecord"]
Autnum["oam.AutnumRecord<br/>oamreg.AutnumRecord"]
end
Sources: , OAM import statements across codebase
Entity Wrapper¶
Every OAM asset is wrapped in a dbt.Entity structure from the asset-db library. This wrapper provides:
- Unique ID: Database identifier for the entity
- Asset: The OAM asset itself
- Relationships: Edges to other entities
The AssetData structure pairs an OAM asset with its type:
Assets are created from user input in , where scope elements (domains, IPs, CIDRs, ASNs) are converted to OAM assets.
Sources: ,
Sessions¶
A Session represents an isolated enumeration context. Each session has its own configuration, scope, database connections, cache, and work queue. Multiple sessions can run concurrently within a single engine instance.
Session Structure¶
graph TB
subgraph "Session Components"
Session["sessions.Session"]
Session --> ID["id: uuid.UUID"]
Session --> Log["log: *slog.Logger"]
Session --> PubSub["ps: *pubsub.Logger"]
Session --> Config["cfg: *config.Config"]
Session --> Scope["scope: *scope.Scope"]
Session --> DB["db: repository.Repository"]
Session --> Cache["cache: *cache.Cache"]
Session --> Queue["queue: *sessionQueue"]
Session --> Stats["stats: *SessionStats"]
Session --> Done["done: chan struct{}"]
end
subgraph "Session Queue"
SQ["sessionQueue"]
SQ --> QDB["db: *queuedb.QueueDB<br/>(SQLite)"]
end
subgraph "Session Stats"
Stats2["SessionStats"]
Stats2 --> Total["WorkItemsTotal: int"]
Stats2 --> Completed["WorkItemsCompleted: int"]
end
Queue --> SQ
Stats --> Stats2
Sources: ,
Session Lifecycle¶
Sessions are managed by the SessionManager :
stateDiagram-v2
[*] --> Creating: NewSession(cfg)
Creating --> Initializing: CreateSession()
Initializing --> SetupDB: setupDB()
SetupDB --> CreateTmpDir: createTemporaryDir()
CreateTmpDir --> CreateCache: createFileCacheRepo()
CreateCache --> CreateQueue: newSessionQueue()
CreateQueue --> Active: Session Ready
Active --> Processing: Events Dispatched
Processing --> Active: Work Continues
Active --> Terminating: CancelSession(id)
Terminating --> WaitComplete: Wait for WorkItems
WaitComplete --> Cleanup: Close Queue/Cache/DB
Cleanup --> RemoveTmpDir: Remove Temp Directory
RemoveTmpDir --> [*]
Key Methods:
| Method | Purpose |
|---|---|
CreateSession() |
Initialize new session with config |
ID() |
Get session UUID |
Config() |
Access session configuration |
DB() |
Get primary database repository |
Cache() |
Get session-specific cache |
Queue() |
Get work queue for tracking entities |
Stats() |
Get processing statistics |
Done() |
Check if session is terminated |
Kill() |
Terminate session |
Sources: , ,
Session Queue¶
Each session has a dedicated work queue backed by SQLite . The queue tracks which entities have been scheduled for processing and which have been completed.
Queue Database Schema:
erDiagram
Element {
uint64 ID PK
time CreatedAt
time UpdatedAt
string Type "oam.AssetType"
string EntityID "dbt.Entity.ID"
bool Processed
}
Queue Operations:
| Method | Purpose | Source |
|---|---|---|
Has(e *dbt.Entity) |
Check if entity is already queued | |
Append(e *dbt.Entity) |
Add entity to queue | |
Next(atype, num) |
Get next batch of unprocessed entities | |
Processed(e *dbt.Entity) |
Mark entity as processed |
Sources: ,
Handlers and Plugins¶
Handler Structure¶
A Handler is a callback function registered by a plugin to process specific asset types. Handlers are the actual processing units that examine assets and generate new discoveries.
Handler Definition :
type Handler struct {
Plugin Plugin // Owner plugin
Name string // Handler identifier
Priority int // Execution priority (1-9)
MaxInstances int // Concurrency limit (0 = unlimited)
EventType oam.AssetType // Asset type this handler processes
Transforms []string // Transformation types produced
Callback func(*Event) error // Processing function
}
Priority System¶
Handlers execute in priority order from 1 (highest) to 9 (lowest). This ensures critical operations happen before dependent operations:
graph LR
subgraph "Priority Levels"
P1["Priority 1<br/>DNS TXT Records<br/>(Org Discovery)"]
P2["Priority 2<br/>DNS CNAME<br/>(Alias Resolution)"]
P3["Priority 3<br/>DNS A/AAAA<br/>(IP Discovery)"]
P4["Priority 4<br/>DNS NS/MX/SRV<br/>(Subdomain Enum)"]
P5["Priority 5<br/>DNS Apex<br/>(Domain Hierarchy)"]
P6["Priority 6<br/>Company Search<br/>(API Queries)"]
P7["Priority 7<br/>Company Enrich<br/>(Funding/Employees)"]
P8["Priority 8<br/>DNS Reverse<br/>(PTR Lookups)"]
P9["Priority 9<br/>Service Discovery<br/>(HTTP/TLS Probes)"]
end
P1 --> P2 --> P3 --> P4 --> P5 --> P6 --> P7 --> P8 --> P9
Rationale: DNS TXT records may contain organization identifiers (priority 1), which enable CNAME resolution (priority 2), which leads to IP addresses (priority 3), which enable service discovery (priority 9).
Sources:
Plugin Interface¶
Plugins implement the Plugin interface :
type Plugin interface {
Name() string
Start(r Registry) error // Register handlers with registry
Stop()
}
During Start(), plugins register one or more handlers with the registry. Example registration pattern:
r.RegisterHandler(&Handler{
Plugin: plugin,
Name: "dns-txt-handler",
Priority: 1,
EventType: oam.FQDN,
Transforms: []string{"to-organization"},
Callback: plugin.handleTXT,
})
Sources:
Asset Pipelines¶
Pipeline Construction¶
For each OAM asset type, the registry builds an Asset Pipeline consisting of all registered handlers for that type, ordered by priority .
graph TB
subgraph "Asset Pipeline for oam.FQDN"
Input["PipelineQueue<br/>et.PipelineQueue"]
Input --> Stage1["Priority 1 Stage<br/>DNS TXT Handler"]
Stage1 --> Stage2["Priority 2 Stage<br/>DNS CNAME Handler"]
Stage2 --> Stage3["Priority 3 Stage<br/>DNS A/AAAA Handler"]
Stage3 --> Stage4["Priority 4 Stage<br/>DNS NS/MX/SRV Handler"]
Stage4 --> Stage5["Priority 5 Stage<br/>DNS Apex Handler"]
Stage5 --> Sink["Sink<br/>Completion Callback"]
end
subgraph "Stage Types"
FIFO["FIFO<br/>(MaxInstances = 0)"]
FixedPool["FixedPool<br/>(MaxInstances > 0)"]
Parallel["Parallel<br/>(Multiple handlers<br/>same priority)"]
end
Pipeline Stage Types :
| Stage Type | When Used | Behavior |
|---|---|---|
FIFO |
Single handler, MaxInstances = 0 |
Serial processing, unlimited goroutines |
FixedPool |
Single handler, MaxInstances > 0 |
Concurrent processing, limited pool |
Parallel |
Multiple handlers, same priority | All handlers run concurrently |
Sources: ,
Pipeline Execution¶
Pipelines execute continuously in the background :
sequenceDiagram
participant Q as PipelineQueue
participant P as Pipeline
participant H as Handler
participant S as Sink
loop Continuous Processing
Q->>P: Next element available?
P->>Q: Get EventDataElement
Q-->>P: Return element
P->>H: Execute handlerTask()
H->>H: Check session.Done()
H->>H: Check transformations
H->>H: Execute callback()
H-->>P: Return result
P->>S: Send to sink
S->>S: Post to completion queue
end
Handler Execution :
- Extract EventDataElement: Validate pipeline data
- Check Session: Skip if session terminated
- Check Transformations: Apply config filters
- Execute Callback: Run handler's processing function
- Error Handling: Accumulate errors in EventDataElement
Sources:
Transformations¶
Transformations are configuration rules that control which plugins can process which asset types. They provide fine-grained control over the discovery pipeline.
Transformation Rules¶
Transformations are defined in config.yaml :
Each transformation specifies:
- From: Source asset type (e.g., FQDN, IPAddress)
- To: Target plugin name or all
- Exclude: Plugins to exclude when using all
Transformation Matching¶
When a handler executes, the system checks if it's allowed to process the current asset :
flowchart TD
Start["Handler Execution"]
Start --> HasTrans{"Transformations<br/>defined for<br/>asset type?"}
HasTrans --> |No| Execute["Execute Handler"]
HasTrans --> |Yes| AllExclude{"Is plugin in<br/>'all' exclude list?"}
AllExclude --> |Yes| Skip["Skip Handler"]
AllExclude --> |No| PluginMatch{"Is plugin<br/>explicitly<br/>listed in 'to'?"}
PluginMatch --> |Yes| Execute
PluginMatch --> |No| TransMatch{"Does plugin produce<br/>transformation<br/>in config?"}
TransMatch --> |Yes| Execute
TransMatch --> |No| Skip
Matching Logic :
- Get all transformations for the asset type
- Check if plugin is excluded via
allexclusion list - Check if plugin name matches
tofield - Check if handler's
Transformsintersect with config transformations
Sources:
Complete Event Flow¶
This diagram shows how all core concepts work together:
graph TB
subgraph "1. Event Creation"
Input["User Input / Plugin Discovery"]
Input --> CreateEvent["Create et.Event"]
CreateEvent --> Event["Event{<br/>Name, Entity,<br/>Session, Dispatcher}"]
end
subgraph "2. Dispatch & Queue"
Event --> Dispatch["dispatcher.DispatchEvent()"]
Dispatch --> Validate{"Validate<br/>Event"}
Validate --> |Valid| CheckDup{"Already<br/>in Queue?"}
CheckDup --> |No| QAppend["session.Queue().Append()"]
QAppend --> QDB[("queue.db<br/>SQLite")]
QDB --> WaitFill["Wait for<br/>fillPipelineQueues()"]
end
subgraph "3. Pipeline Processing"
WaitFill --> GetNext["session.Queue().Next()"]
GetNext --> Wrap["Wrap in<br/>EventDataElement"]
Wrap --> APQueue["AssetPipeline.Queue"]
APQueue --> Pipeline["Pipeline Execution"]
Pipeline --> P1["Priority 1<br/>Handlers"]
P1 --> P2["Priority 2<br/>Handlers"]
P2 --> Pn["Priority N<br/>Handlers"]
end
subgraph "4. Handler Processing"
Pn --> Handler["Handler.Callback()"]
Handler --> CheckTrans["Check<br/>Transformations"]
CheckTrans --> |Allowed| Execute["Execute Logic"]
Execute --> NewEvents["Generate<br/>New Events?"]
NewEvents --> |Yes| Dispatch
end
subgraph "5. Completion"
Pn --> Sink["Pipeline Sink"]
Sink --> Complete["Completion Callback"]
Complete --> UpdateStats["Increment<br/>WorkItemsCompleted"]
Complete --> Mark["Mark Processed<br/>in Queue"]
end
CheckDup --> |Yes| Skip["Skip: Duplicate"]
Validate --> |Invalid| Reject["Reject"]
Key Points: 1. Events can generate new events recursively (discovery cascade) 2. The session queue prevents duplicate processing 3. Pipelines execute handlers in priority order 4. Transformations filter which handlers execute 5. Completion callbacks track progress statistics
Sources: , ,