Skip to main content

Chapter 4.2 - SIEM, SOAR & Detection Engineering

Module 4: Defense Engineering & Hardening Prerequisites: Chapter 4.1 (Firewall Architecture, Segmentation & Zero Trust)


Table of Contents

  1. SIEM Architecture - Ingestion, Normalization & Correlation
  2. Log Sources & Collection - What to Ingest and Why
  3. Detection Engineering - Sigma, YARA & Rule Development
  4. Elastic Stack (ELK) - Deployment, Pipelines & Detection
  5. Splunk - SPL, Correlation Searches & Dashboards
  6. Behavioral Analytics - UBA, Baselining & Anomaly Detection
  7. SOAR - Playbook Automation & Orchestration
  8. Alert Triage & False Positive Management
  9. Detection Coverage Mapping - ATT&CK & Gap Analysis
  10. MITRE ATT&CK Mapping

1. SIEM Architecture - Ingestion, Normalization & Correlation

A Security Information and Event Management (SIEM) system centralizes log collection, normalizes disparate data formats, stores events for retention and investigation, and runs correlation rules to generate actionable alerts. The quality of a SIEM is entirely determined by what goes in and the quality of the detection logic applied to it - a misconfigured or poorly-tuned SIEM generates noise, not intelligence.

Core SIEM Pipeline

Log Sources → Collection → Transport → Parsing/Normalization → Indexing → Correlation → Alerting

Each stage introduces potential failure points:

StageCommon FailureConsequence
CollectionAgent not installed, log rotation before collectionMissing telemetry - blind spots
TransportSyslog UDP loss, TLS misconfigurationData loss at volume; cleartext in transit
ParsingWrong parser for log format, vendor version changeFields not extracted - correlation rules break
NormalizationInconsistent field names (src_ip vs sourceAddress)Cross-source correlation impossible
IndexingStorage full, hot tier exhaustedRecent events not searchable
CorrelationOverly broad rules, no baseliningAlert fatigue; real alerts buried

Data Volume Planning

# Estimate daily log volume before deploying
# Average log rates by source type:
# Windows Event Log: ~500-2000 events/host/day (endpoint)
# Active Directory DC: ~50,000-200,000 events/day
# Firewall: ~100MB-5GB/day depending on traffic
# Web proxy: ~1-10GB/day
# DNS server: ~500MB-2GB/day
# EDR (CrowdStrike): ~1-5GB/host/day (raw telemetry)

# Calculate total:
python3 << 'EOF'
sources = {
"Windows Endpoints (500 hosts)": 500 * 1500 * 200, # events * avg_bytes
"AD Domain Controllers (4)": 4 * 100000 * 200,
"Firewalls (3)": 3 * 1e9, # 1GB/day each
"Web Proxy": 5e9,
"DNS": 1e9,
}
total_bytes = sum(sources.values())
print(f"Estimated daily ingest: {total_bytes/1e9:.1f} GB/day")
print(f"Monthly storage (90-day retention): {total_bytes*90/1e12:.1f} TB")
EOF


2. Log Sources & Collection - What to Ingest and Why

Critical Log Sources by Priority

Tier 1 - Must have (detection severely degraded without these):

# Windows Security Event Log - authentication, privilege use, process creation
# Key Event IDs:
# 4624 - Successful logon (type 3=network, type 10=remote interactive)
# 4625 - Failed logon (brute force detection)
# 4648 - Logon with explicit credentials (Pass-the-Hash indicator)
# 4663 - Object access (file/registry access)
# 4688 - Process creation (requires audit policy + command line logging)
# 4698 - Scheduled task created (persistence)
# 4720 - User account created
# 4728/4732 - Member added to security/local group (privilege escalation)
# 4769 - Kerberos service ticket (Kerberoasting)
# 4771 - Kerberos pre-auth failed (AS-REP roasting attempt)
# 7045 - New service installed (psexec / malware persistence)

# Enable detailed process creation logging (Windows)
# Via GPO: Computer Config -> Audit Policy -> Audit Process Creation -> Success
# AND enable command line in process creation events:
reg add "HKLM\SOFTWARE\Microsoft\Windows\CurrentVersion\Policies\System\Audit" \
/v ProcessCreationIncludeCmdLine_Enabled /t REG_DWORD /d 1 /f

# Linux auditd - syscall and file access auditing
apt install auditd
cat > /etc/audit/rules.d/hardening.rules << 'EOF'
# Log all authentication events
-w /var/log/auth.log -p wa -k auth_log
-w /etc/passwd -p wa -k passwd_changes
-w /etc/shadow -p wa -k shadow_changes
-w /etc/sudoers -p wa -k sudoers_changes

# Log privilege escalation
-a always,exit -F arch=b64 -S setuid -S setgid -k privilege_escalation
-a always,exit -F arch=b64 -S execve -F euid=0 -k root_commands

# Log network connections (outbound - detect C2)
-a always,exit -F arch=b64 -S connect -k outbound_connections

# Log cron modifications (persistence)
-w /etc/cron.d -p wa -k cron_changes
-w /var/spool/cron -p wa -k cron_changes

# Log SSH key additions (persistence)
-w /root/.ssh -p wa -k ssh_keys
-w /home -p wa -k home_ssh_keys
EOF
auditctl -R /etc/audit/rules.d/hardening.rules
service auditd restart

Tier 2 - High value:

# PowerShell Script Block Logging - see exactly what PowerShell executed
# Even obfuscated/encoded commands are logged after decoding
# GPO: Computer Config -> Administrative Templates -> PowerShell ->
# Turn on PowerShell Script Block Logging -> Enabled

# Event ID 4104 = PowerShell script block executed
# Event ID 400/403 = PowerShell engine start/stop
# Event ID 4103 = Module logging

# DNS query logging - invaluable for C2/DGA/exfil detection
# Windows DNS Server: enable analytical/debug logging
dnscmd /config /logLevel 0x8100F331
dnscmd /config /logFilePath C:\Windows\System32\dns\dns.log

# Linux BIND:
# /etc/bind/named.conf.options:
# logging {
# channel query_log {
# file "/var/log/named/query.log" versions 3 size 20m;
# severity dynamic;
# print-time yes;
# };
# category queries { query_log; };
# };

# DHCP logs - IP to MAC/hostname mapping (essential for attribution)
# When you see a suspicious IP in firewall logs, DHCP tells you which device it was
# Windows DHCP: logs in C:\Windows\System32\dhcp\
# ISC DHCP: /var/log/dhcpd.log or journalctl -u isc-dhcp-server

# Web server access logs - all HTTP requests with status codes
# Apache/nginx: combined log format minimum
# Add: X-Forwarded-For logging, response time, TLS version, cipher

Log Collection with Elastic Agent

# Install Elastic Agent (replaces Filebeat/Winlogbeat/Metricbeat)
# Download from: https://www.elastic.co/downloads/elastic-agent

# Linux install
curl -L -O https://artifacts.elastic.co/downloads/beats/elastic-agent/elastic-agent-8.x.x-linux-x86_64.tar.gz
tar xzf elastic-agent-8.x.x-linux-x86_64.tar.gz
cd elastic-agent-8.x.x-linux-x86_64

# Enroll agent to Fleet server
./elastic-agent install \
--url=https://fleet.internal.corp:8220 \
--enrollment-token=YOUR_ENROLLMENT_TOKEN \
--insecure

# Windows: deploy via GPO or SCCM
# MSI installer: elastic-agent-8.x.x-windows-x86_64.msi
# Command line:
msiexec /i elastic-agent-8.x.x-windows-x86_64.msi \
FLEET_URL="https://fleet.internal.corp:8220" \
ENROLLMENT_TOKEN="YOUR_TOKEN" /qn

# Verify agent status
elastic-agent status
elastic-agent inspect # Show current configuration

# Syslog collection (for network devices that can't run agents)
# rsyslog on collector server:
cat > /etc/rsyslog.d/remote.conf << 'EOF'
# Receive syslog over TLS (port 6514)
module(load="imtls")
input(type="imtls"
port="6514"
tls.cacert="/etc/ssl/ca.crt"
tls.mycert="/etc/ssl/collector.crt"
tls.myprivkey="/etc/ssl/collector.key")

# Template: write to per-host files
$template RemoteLogs,"/var/log/remote/%HOSTNAME%/%PROGRAMNAME%.log"
*.* ?RemoteLogs
EOF
systemctl restart rsyslog

3. Detection Engineering - Sigma, YARA & Rule Development

Detection Engineering Principles

Detection engineering is the discipline of translating threat intelligence and adversary behavior into operational detection logic. The output is a rule - a precise logical expression that fires when specific conditions are met in telemetry.

Detection quality dimensions:

DimensionBad RuleGood Rule
SpecificityFires on any PowerShell executionFires on encoded commands with net.webclient
SensitivityOnly fires on known hashFires on behavior pattern (regardless of hash)
ResilienceBlocked by renaming cmd.exeBased on syscalls - rename-resistant
MaintainabilityHard-coded IOC listLogic-based with external threat intel feed
CoverageSingle log sourceMulti-source correlation

Sigma - Vendor-Agnostic Detection Rules

Sigma is a generic signature format for log-based detection. A single Sigma rule compiles to Splunk SPL, Elastic KQL, Microsoft Sentinel KQL, QRadar AQL, and others - write once, deploy everywhere.

# sigma/rules/windows/process_creation/proc_creation_win_powershell_download_cradle.yml
title: PowerShell Download Cradle Execution
id: 3b6ab547-8ec2-4991-b9d2-2b06702a753e
status: stable
description: Detects PowerShell download cradle patterns used to download and execute payloads
references:
- https://attack.mitre.org/techniques/T1059/001/
author: Detection Engineering Team
date: 2024/01/15
tags:
- attack.execution
- attack.t1059.001
- attack.defense_evasion
- attack.t1027
logsource:
category: process_creation # Log source category
product: windows
detection:
selection_powershell:
Image|endswith:
- '\powershell.exe'
- '\pwsh.exe'
selection_download:
CommandLine|contains:
- 'DownloadString' # Classic download cradle
- 'DownloadFile'
- 'WebClient'
- 'IEX' # Invoke-Expression
- 'Invoke-Expression'
- 'IWR' # Invoke-WebRequest
- 'Invoke-WebRequest'
- 'Net.WebClient'
- 'curl ' # Space after curl in PowerShell context
- 'wget '
selection_encoded:
CommandLine|contains:
- '-EncodedCommand'
- '-enc '
- '-ec '
condition: selection_powershell and (selection_download or selection_encoded)
falsepositives:
- Legitimate administrative scripts using WebClient
- Software update scripts
level: high
# Convert Sigma rule to various SIEM query formats
pip install sigma-cli
sigma plugin install elasticsearch # Install Elasticsearch backend
sigma plugin install splunk # Install Splunk backend
sigma plugin install sentinel # Install Microsoft Sentinel backend

# Convert to Elastic KQL
sigma convert -t lucene \
sigma/rules/windows/process_creation/proc_creation_win_powershell_download_cradle.yml

# Convert to Splunk SPL
sigma convert -t splunk \
sigma/rules/windows/process_creation/proc_creation_win_powershell_download_cradle.yml

# Convert to Microsoft Sentinel KQL
sigma convert -t azuremonitor \
-p sentinel \
sigma/rules/windows/process_creation/proc_creation_win_powershell_download_cradle.yml

# Bulk convert entire rule directory
sigma convert -t lucene \
--pipeline ecs_windows \ # Apply ECS field mapping
sigma/rules/windows/ \
-o /tmp/elastic_rules/

# Validate a Sigma rule
sigma check \
sigma/rules/windows/process_creation/proc_creation_win_powershell_download_cradle.yml

More Sigma Rules - Key Attack Patterns

# Detect DCSync (T1003.006) - non-DC performing directory replication
title: DCSync Attack - Non-DC Replication Request
id: 56e0d8b8-3886-44d4-b1b5-5ff4f3e9ac56
logsource:
product: windows
service: security
detection:
selection:
EventID: 4662
ObjectType|contains: 'domainDNS'
AccessMask|contains: '0x100' # DS-Replication-Get-Changes
filter_dc:
SubjectUserName|endswith: '$' # Machine accounts (DCs) end with $
condition: selection and not filter_dc # Non-machine account = suspicious
level: critical
tags:
- attack.credential_access
- attack.t1003.006
# Detect Pass-the-Hash (T1550.002) - logon type 3 with no kerberos
title: Pass-the-Hash via NTLM Logon
id: a9e3c12d-8e4f-5c3b-9f2a-1d4e7b8c9a0f
logsource:
product: windows
service: security
detection:
selection:
EventID: 4624
LogonType: 3 # Network logon
AuthenticationPackageName: NTLM # NTLM used (not Kerberos)
WorkstationName: '-' # Empty workstation = PtH indicator
filter_legitimate:
SubjectUserName: 'ANONYMOUS LOGON' # Filter anonymous
IpAddress: '127.0.0.1' # Filter loopback
condition: selection and not filter_legitimate
level: medium
tags:
- attack.lateral_movement
- attack.t1550.002
# Detect Kerberoasting - RC4 TGS requests
title: Kerberoasting - RC4 Encrypted TGS Request
id: 32d1f04a-9f7b-4c2e-8d3f-5a1b9c7e2d4f
logsource:
product: windows
service: security
detection:
selection:
EventID: 4769
TicketEncryptionType: '0x17' # RC4-HMAC (weak, used for cracking)
TicketOptions: '0x40810000' # Forwardable, Renewable
filter_computers:
ServiceName|endswith: '$' # Computer accounts (normal)
filter_krbtgt:
ServiceName: 'krbtgt'
condition: selection and not filter_computers and not filter_krbtgt
level: high
tags:
- attack.credential_access
- attack.t1558.003

YARA - Memory & File-Based Detection

YARA rules match byte patterns and strings in files, processes, or memory - used by EDR, AV, and sandbox platforms:

// yara/rules/cobalt_strike_beacon.yar
rule CobaltStrike_Beacon_Default_Config
{
meta:
description = "Detects Cobalt Strike Beacon with default configuration"
author = "Detection Team"
date = "2024-01-15"
mitre = "T1059.003, T1071.001"
reference = "https://www.cobaltstrike.com"

strings:
// Default CS beacon metadata strings
$cs1 = "ReflectiveLoader" ascii wide
$cs2 = "%s as %s\\%s: %d" ascii
$cs3 = "Started service %s on %s" ascii

// Sleep mask identifier
$sleep = { 48 83 EC 28 B9 00 00 00 08 FF 15 }

// Common Cobalt Strike PE header characteristics
$mz = { 4D 5A } // MZ header
$pe = { 50 45 00 00 } // PE header

condition:
$mz at 0 and $pe and
2 of ($cs*) and $sleep
}

rule Mimikatz_Memory_Pattern
{
meta:
description = "Detects Mimikatz patterns in memory or files"
author = "Detection Team"
tags = "credential_access, T1003"

strings:
$m1 = "sekurlsa::logonpasswords" ascii wide nocase
$m2 = "lsadump::sam" ascii wide nocase
$m3 = "kerberos::golden" ascii wide nocase
$m4 = "privilege::debug" ascii wide nocase
$m5 = { 6D 69 6D 69 6B 61 74 7A } // "mimikatz" hex
$m6 = "Benjamin DELPY" ascii wide // Author string in binary

condition:
2 of them
}
# Run YARA against files
yara -r /path/to/rules/ /suspicious/file.exe # -r: recursive
yara cobalt_strike_beacon.yar /tmp/memory_dump.bin

# Scan running processes
yara cobalt_strike_beacon.yar /proc/*/mem 2>/dev/null

# Scan with all rules in a directory recursively
yara -r /etc/yara/rules/ /quarantine/ \
--print-meta \ # Print rule metadata
--print-strings \ # Print matching strings
--fail-on-warnings # Strict mode

# Integrate with Zeek - scan file transfers in real time
# Zeek files.log provides extracted files -> pipe to YARA scanner
zeek -r capture.pcap -C local
# Then scan extracted files:
find /tmp/zeek_extracted/ -type f -exec \
yara -r /etc/yara/rules/ {} \; | \
grep -v "^$"

4. Elastic Stack (ELK) - Deployment, Pipelines & Detection

Ingest Pipeline - Field Extraction & Normalization

# Create an Elasticsearch ingest pipeline for Windows Security events
curl -X PUT "https://elasticsearch:9200/_ingest/pipeline/windows-security" \
-H "Content-Type: application/json" \
-u elastic:PASSWORD \
-d '{
"description": "Windows Security Event Log processing pipeline",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{TIMESTAMP_ISO8601:timestamp} %{DATA:hostname} %{DATA:process}\\[%{NUMBER:pid}\\]: %{GREEDYDATA:log_message}"
],
"ignore_failure": true
}
},
{
"geoip": {
"field": "source.ip",
"target_field": "source.geo",
"ignore_missing": true
}
},
{
"set": {
"field": "event.dataset",
"value": "windows.security"
}
},
{
"script": {
"lang": "painless",
"source": """
// Normalize logon type to human-readable
Map logonTypes = [
"2": "Interactive",
"3": "Network",
"4": "Batch",
"5": "Service",
"7": "Unlock",
"8": "NetworkCleartext",
"9": "NewCredentials",
"10": "RemoteInteractive",
"11": "CachedInteractive"
];
if (ctx.winlog?.event_data?.LogonType != null) {
String lt = ctx.winlog.event_data.LogonType;
ctx.logon_type_name = logonTypes.getOrDefault(lt, "Unknown(" + lt + ")");
}
"""
}
}
]
}'

Kibana Detection Rules - KQL & EQL

# KQL (Kibana Query Language) - field:value matching
# Search for Pass-the-Hash indicators in Kibana Discover:
winlog.event_id:4624 AND winlog.event_data.LogonType:3 AND winlog.event_data.AuthenticationPackageName:NTLM

# Search for scheduled task creation
winlog.event_id:4698 AND NOT user.name:SYSTEM

# Detect large numbers of failed logons from same IP (brute force)
winlog.event_id:4625 | stats count by source.ip | where count > 20

# EQL (Event Query Language) - sequence detection across multiple events
# Detect: PowerShell spawning cmd.exe spawning net.exe (classic lateral movement chain)
curl -X GET "https://elasticsearch:9200/logs-*/_eql/search" \
-H "Content-Type: application/json" \
-u elastic:PASSWORD \
-d '{
"query": """
sequence with maxspan=2m
[process where process.name == "powershell.exe"]
[process where process.name == "cmd.exe" and
process.parent.name == "powershell.exe"]
[process where process.name in ("net.exe", "net1.exe") and
process.parent.name == "cmd.exe"]
""",
"size": 100
}'

# EQL: Detect persistence via registry run key
curl -X GET "https://elasticsearch:9200/logs-*/_eql/search" \
-H "Content-Type: application/json" \
-u elastic:PASSWORD \
-d '{
"query": """
sequence by host.name with maxspan=30s
[registry where registry.path like~ "*\\\\Run\\\\*" and
registry.data.strings like~ ("*.exe*", "*.dll*", "*.ps1*")]
[process where process.name in ("cmd.exe","powershell.exe","wscript.exe")]
""",
"size": 50
}'

Elasticsearch Index Lifecycle Management

# Configure ILM policy for log retention tiers
curl -X PUT "https://elasticsearch:9200/_ilm/policy/security-logs-policy" \
-H "Content-Type: application/json" \
-u elastic:PASSWORD \
-d '{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_size": "50GB", // Roll index after 50GB
"max_age": "1d" // Or after 1 day
},
"set_priority": { "priority": 100 }
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 },
"set_priority": { "priority": 50 }
}
},
"cold": {
"min_age": "30d",
"actions": {
"freeze": {},
"set_priority": { "priority": 0 }
}
},
"delete": {
"min_age": "365d",
"actions": { "delete": {} } // Delete after 1 year
}
}
}
}'

5. Splunk - SPL, Correlation Searches & Dashboards

Splunk's Search Processing Language (SPL) is a pipeline-based query language where each command transforms the result set of the previous command.

Core SPL Patterns for Security Detection

| Detect brute force login attempts
index=windows EventCode=4625
| bucket _time span=5m
| stats count by _time, src_ip, user
| where count > 10
| sort - count
| table _time, src_ip, user, count

| Detect lateral movement - same account logging into multiple hosts rapidly
index=windows EventCode=4624 LogonType=3
| bucket _time span=10m
| stats dc(host) as unique_hosts, values(host) as hosts by _time, user
| where unique_hosts > 5
| sort - unique_hosts

| Detect PowerShell download cradle (process creation with command line)
index=windows EventCode=4688
| where like(CommandLine, "%DownloadString%")
OR like(CommandLine, "%IEX%")
OR like(CommandLine, "%-EncodedCommand%")
OR like(CommandLine, "%WebClient%")
| eval risk_score=case(
like(CommandLine, "%-EncodedCommand%"), 90,
like(CommandLine, "%IEX%") AND like(CommandLine, "%WebClient%"), 95,
true(), 70)
| table _time, host, user, CommandLine, risk_score
| sort - risk_score

| Kerberoasting detection - RC4 TGS requests
index=windows EventCode=4769
| where TicketEncryptionType="0x17"
| bucket _time span=1m
| stats count, values(ServiceName) as services by _time, src_ip
| where count > 3
| eval alert="Possible Kerberoasting"
| table _time, src_ip, count, services, alert

| DNS tunneling detection - unusually long domain names
index=dns
| eval qname_length=len(query)
| where qname_length > 52 | Subdomains > 52 chars are suspicious
| stats count by src_ip, query
| where count > 20
| sort - count

| Beaconing detection - regular outbound connections
index=network dest_port=443
| bucket _time span=1h
| stats count by _time, src_ip, dest_ip
| stats stdev(count) as jitter, avg(count) as avg_conns by src_ip, dest_ip
| where avg_conns > 3 AND jitter < 1.5 | Low variance = periodic = beacon
| sort - avg_conns

| Find accounts with multiple failed logons followed by success (successful brute force)
index=windows (EventCode=4625 OR EventCode=4624)
| eval event_type=if(EventCode==4625, "failure", "success")
| sort _time
| streamstats count(eval(event_type="failure")) as fail_count
count(eval(event_type="success")) as success_count
by user
| where success_count=1 AND fail_count > 5
| dedup user
| table user, fail_count, _time, host, src_ip

Splunk Notable Events & Risk-Based Alerting

| Risk-Based Alerting - aggregate risk scores per entity
| tstats summariesonly=true sum(All_Risk.calculated_risk_score) as risk_score
count(All_Risk.calculated_risk_score) as risk_event_count
values(All_Risk.annotations.mitre_attack.mitre_technique_id) as mitre_techniques
from datamodel=Risk
by All_Risk.risk_object, All_Risk.risk_object_type, _time span=24h
| `drop_dm_object_name("All_Risk")`
| where risk_score > 200
| sort - risk_score
| table risk_object, risk_object_type, risk_score, risk_event_count, mitre_techniques

| Threat hunting - find all processes spawned by Office applications
index=windows EventCode=4688
| where ParentProcessName IN ("winword.exe","excel.exe","powerpnt.exe","outlook.exe")
| where ProcessName NOT IN ("splwow64.exe","AcroRd32.exe","WINWORD.exe")
| stats count by host, ParentProcessName, ProcessName, CommandLine
| sort - count

| Hunt for living-off-the-land binaries (LOLBins) used for lateral movement
index=windows EventCode=4688
| where ProcessName IN (
"certutil.exe","mshta.exe","regsvr32.exe","rundll32.exe",
"wscript.exe","cscript.exe","msiexec.exe","installutil.exe",
"odbcconf.exe","regasm.exe","regsvcs.exe","cmstp.exe")
| stats count by host, ProcessName, CommandLine, user
| sort - count

6. Behavioral Analytics - UBA, Baselining & Anomaly Detection

Building Behavioral Baselines

#!/usr/bin/env python3
# baseline_logon_hours.py - build per-user logon hour baseline
# Input: Windows Security Event 4624 logs as JSON

import json
import pandas as pd
import numpy as np
from collections import defaultdict
from datetime import datetime

# Load logon events (exported from SIEM as JSON)
events = []
with open('/tmp/logon_events.json') as f:
for line in f:
events.append(json.loads(line))

df = pd.DataFrame(events)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour'] = df['timestamp'].dt.hour
df['dayofweek'] = df['timestamp'].dt.dayofweek # 0=Monday, 6=Sunday

# Build baseline: for each user, what hours do they normally log in?
baseline = df.groupby(['user', 'hour', 'dayofweek']).size().reset_index(name='count')

def is_anomalous_logon(user, hour, dayofweek, baseline_df, threshold=0):
"""Return True if this logon time has never been seen for this user"""
match = baseline_df[
(baseline_df['user'] == user) &
(baseline_df['hour'] == hour) &
(baseline_df['dayofweek'] == dayofweek)
]
return len(match) == 0 or match['count'].iloc[0] <= threshold

# Test recent events against baseline
recent_events = df[df['timestamp'] > df['timestamp'].max() - pd.Timedelta('1d')]
for _, event in recent_events.iterrows():
if is_anomalous_logon(event['user'], event['hour'], event['dayofweek'], baseline):
print(f"[ANOMALY] User: {event['user']} | "
f"Hour: {event['hour']}:00 | "
f"Day: {event['dayofweek']} | "
f"Source: {event.get('src_ip','unknown')} | "
f"Host: {event.get('host','unknown')}")

Elastic ML - Anomaly Detection Jobs

# Create ML job for unusual process activity
curl -X PUT "https://elasticsearch:9200/_ml/anomaly_detectors/unusual_process_activity" \
-H "Content-Type: application/json" \
-u elastic:PASSWORD \
-d '{
"description": "Detect unusual process execution patterns per host",
"analysis_config": {
"bucket_span": "15m",
"detectors": [
{
"detector_description": "Rare process per host",
"function": "rare",
"by_field_name": "process.name",
"partition_field_name": "host.name"
},
{
"detector_description": "Unusual process count",
"function": "high_count",
"by_field_name": "process.name",
"partition_field_name": "host.name"
}
],
"influencers": ["host.name", "user.name", "process.name"]
},
"data_description": {
"time_field": "@timestamp",
"time_format": "epoch_ms"
},
"analysis_limits": {
"model_memory_limit": "256mb"
}
}'

# Open and start the job
curl -X POST "https://elasticsearch:9200/_ml/anomaly_detectors/unusual_process_activity/_open" \
-u elastic:PASSWORD

# Create datafeed (connects job to index)
curl -X PUT "https://elasticsearch:9200/_ml/datafeeds/datafeed-unusual_process_activity" \
-H "Content-Type: application/json" \
-u elastic:PASSWORD \
-d '{
"job_id": "unusual_process_activity",
"indices": ["logs-endpoint.events.process-*"],
"query": {
"bool": {
"filter": [{"range": {"@timestamp": {"gte": "now-24h"}}}]
}
}
}'

# Start datafeed
curl -X POST "https://elasticsearch:9200/_ml/datafeeds/datafeed-unusual_process_activity/_start" \
-u elastic:PASSWORD

7. SOAR - Playbook Automation & Orchestration

SOAR Architecture

SOAR (Security Orchestration, Automation and Response) reduces mean time to respond (MTTR) by automating repeatable investigation and response steps. A well-built playbook handles the first 80% of alert triage automatically, presenting analysts with enriched, pre-investigated cases.

Key SOAR platforms: Splunk SOAR (formerly Phantom), Palo Alto XSOAR, TheHive + Cortex, Shuffle (open-source).

Shuffle - Open-Source SOAR Playbook

# Shuffle workflow (Python action) - automated phishing response
# Triggered when: email security alert fires for suspicious email

import requests
import json

def analyze_phishing_email(email_data):
"""
Automated phishing email triage playbook
Steps: extract IOCs -> enrich -> contain -> notify
"""
results = {}

# Step 1: Extract IOCs from email
sender_ip = email_data.get('sender_ip')
sender_domain = email_data.get('sender_domain')
urls = email_data.get('urls', [])
attachments = email_data.get('attachments', [])

# Step 2: Enrich IP reputation (VirusTotal)
vt_headers = {"x-apikey": VT_API_KEY}
ip_report = requests.get(
f"https://www.virustotal.com/api/v3/ip_addresses/{sender_ip}",
headers=vt_headers).json()
malicious_votes = ip_report.get('data',{}).get('attributes',{}) \
.get('last_analysis_stats',{}).get('malicious', 0)
results['ip_malicious_votes'] = malicious_votes

# Step 3: Check URLs against threat intel
for url in urls:
url_report = requests.post(
"https://www.virustotal.com/api/v3/urls",
headers=vt_headers,
data={"url": url}).json()
results.setdefault('url_results', []).append({
'url': url,
'analysis_id': url_report.get('data',{}).get('id')
})

# Step 4: Sandbox attachment hashes
for attachment in attachments:
hash_report = requests.get(
f"https://www.virustotal.com/api/v3/files/{attachment['sha256']}",
headers=vt_headers).json()
malicious = hash_report.get('data',{}).get('attributes',{}) \
.get('last_analysis_stats',{}).get('malicious', 0)
results.setdefault('attachment_results', []).append({
'filename': attachment['name'],
'sha256': attachment['sha256'],
'malicious_votes': malicious
})

# Step 5: Automated response based on risk score
risk_score = malicious_votes * 10 + len([u for u in results.get('url_results',[]) if u])

if risk_score > 50:
# High confidence malicious - auto-contain
# Block sender IP on firewall
block_ip_on_firewall(sender_ip)
# Delete email from all mailboxes
delete_email_from_exchange(email_data['message_id'])
# Create high-priority ticket
create_jira_ticket(severity="HIGH", data=results)
results['action'] = 'AUTO_CONTAINED'
elif risk_score > 20:
# Medium confidence - escalate to analyst
create_jira_ticket(severity="MEDIUM", data=results)
results['action'] = 'ESCALATED_TO_ANALYST'
else:
# Low risk - log and close
results['action'] = 'CLOSED_LOW_RISK'

return results

TheHive + Cortex - Open-Source IR Platform

# TheHive API - create a case from a SIEM alert
curl -X POST "https://thehive.internal.corp:9000/api/case" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"title": "Possible Kerberoasting - WORKSTATION-05",
"description": "Event 4769 with RC4 encryption detected. Multiple service accounts targeted.",
"severity": 3,
"tags": ["kerberoasting", "credential_access", "T1558.003"],
"tasks": [
{"title": "Identify source account", "status": "Waiting"},
{"title": "Check if hash was cracked", "status": "Waiting"},
{"title": "Review lateral movement from source IP", "status": "Waiting"},
{"title": "Reset targeted service account passwords", "status": "Waiting"}
]
}'

# Cortex analyzer - enrich an observable (IP address) automatically
curl -X POST "https://cortex.internal.corp:9001/api/analyzer/Shodan_Host_1/run" \
-H "Authorization: Bearer CORTEX_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"data": "203.0.113.42",
"dataType": "ip",
"tlp": 2
}'

# Get analyzer results
curl -X GET "https://cortex.internal.corp:9001/api/job/JOB_ID/report" \
-H "Authorization: Bearer CORTEX_API_KEY"

8. Alert Triage & False Positive Management

Alert Quality Framework

Every alert should be evaluated on two axes:

  • True Positive Rate (TPR): How often does this alert fire on real attacks?
  • False Positive Rate (FPR): How often does it fire on benign activity?

Alert fatigue - analysts ignoring alerts because too many are false positives - is one of the primary causes of breach dwell time. A SOC with 10 high-fidelity detections is operationally superior to one with 500 noisy ones.

#!/usr/bin/env python3
# alert_quality_metrics.py - calculate alert quality from analyst dispositions

import json
from collections import defaultdict

# Load alert disposition data (exported from SIEM/ticketing)
# Format: {rule_name, disposition (TP/FP/Benign_True_Positive), analyst, timestamp}
dispositions = json.load(open('/tmp/alert_dispositions.json'))

metrics = defaultdict(lambda: {'TP': 0, 'FP': 0, 'BTP': 0, 'total': 0})

for alert in dispositions:
rule = alert['rule_name']
disp = alert['disposition']
metrics[rule][disp] = metrics[rule].get(disp, 0) + 1
metrics[rule]['total'] += 1

print(f"{'Rule':<50} {'Total':>6} {'TP%':>6} {'FP%':>6} {'Signal':>8}")
print("-" * 80)

for rule, m in sorted(metrics.items(), key=lambda x: x[1]['total'], reverse=True):
total = m['total']
tp_pct = (m.get('TP',0) / total) * 100
fp_pct = (m.get('FP',0) / total) * 100
# Signal score: high TP + low FP = high signal
signal = tp_pct - (fp_pct * 2)

flag = "NOISY" if fp_pct > 50 else ("GOOD" if tp_pct > 70 else "")
print(f"{rule:<50} {total:>6} {tp_pct:>5.1f}% {fp_pct:>5.1f}% {signal:>7.1f} {flag}")

Tuning Playbook

# When a rule has > 50% FP rate, tune it:

# Step 1: Understand the FP pattern
# Query all FP dispositions for the noisy rule and find common characteristics
# Splunk:
index=notable_events rule_name="PowerShell Download Cradle" disposition=false_positive
| stats count by user, CommandLine, host
| sort - count

# Step 2: Add exception (safe list) without deleting the rule
# Option A: Exclude specific users/hosts
# In Sigma rule - add filter:
# filter_known_good:
# user|contains:
# - 'svc-deployagent' # Deployment automation service
# - 'SCCM-Client'
# CommandLine|contains:
# - 'WindowsUpdate.ps1' # Known legitimate script

# Option B: Raise threshold (only alert after N occurrences)
# Option C: Add risk score instead of alert (alert only when combined risk > threshold)

# Step 3: Document the tuning decision
# Every suppression must be documented with:
# - What was suppressed
# - Why (specific FP pattern)
# - Review date (suppressions expire and get re-evaluated)
# - Owner (who approved)

9. Detection Coverage Mapping - ATT&CK & Gap Analysis

ATT&CK Coverage Assessment

#!/usr/bin/env python3
# coverage_map.py - map existing detections to ATT&CK techniques
# Uses the MITRE ATT&CK Navigator layer format

import json

# Your current detections mapped to ATT&CK technique IDs
current_detections = {
"T1059.001": {"name": "PowerShell", "coverage": "high", "rules": 4},
"T1558.003": {"name": "Kerberoasting", "coverage": "high", "rules": 2},
"T1003.006": {"name": "DCSync", "coverage": "high", "rules": 2},
"T1071.001": {"name": "Web Protocols C2", "coverage": "medium", "rules": 3},
"T1046": {"name": "Port Scanning", "coverage": "low", "rules": 1},
"T1021.002": {"name": "SMB Lateral", "coverage": "medium", "rules": 2},
"T1548.002": {"name": "UAC Bypass", "coverage": "none", "rules": 0},
"T1055": {"name": "Process Injection", "coverage": "none", "rules": 0},
"T1027": {"name": "Obfuscation", "coverage": "low", "rules": 1},
"T1574": {"name": "DLL Hijacking", "coverage": "none", "rules": 0},
}

# Generate ATT&CK Navigator layer
color_map = {"high": "#00aa00", "medium": "#ffaa00", "low": "#ff5500", "none": "#ff0000"}

layer = {
"name": "SOC Detection Coverage",
"versions": {"attack": "14", "navigator": "4.9"},
"domain": "enterprise-attack",
"techniques": []
}

for technique_id, info in current_detections.items():
layer["techniques"].append({
"techniqueID": technique_id,
"color": color_map[info["coverage"]],
"comment": f"Rules: {info['rules']} | Coverage: {info['coverage']}",
"enabled": True,
"score": {"high": 100, "medium": 66, "low": 33, "none": 0}[info["coverage"]]
})

with open('/tmp/attack_coverage_layer.json', 'w') as f:
json.dump(layer, f, indent=2)

print("Coverage summary:")
for level in ["high", "medium", "low", "none"]:
count = sum(1 for v in current_detections.values() if v["coverage"] == level)
print(f" {level:>8}: {count} techniques")

# Upload to ATT&CK Navigator: https://mitre-attack.github.io/attack-navigator/
# File -> Open Existing Layer -> upload attack_coverage_layer.json
# Atomic Red Team - validate detections actually fire
# Atomic Red Team provides test cases for every ATT&CK technique

# Install
Install-Module -Name invoke-atomicredteam -Scope CurrentUser

# Run a specific technique test (T1558.003 = Kerberoasting)
Invoke-AtomicTest T1558.003 -TestNumbers 1 # Run test #1
# Check your SIEM - did the alert fire?

# Run and auto-check for detection
Invoke-AtomicTest T1059.001 -TestNumbers 2 -CheckPrereqs
Invoke-AtomicTest T1059.001 -TestNumbers 2 # Execute
Invoke-AtomicTest T1059.001 -TestNumbers 2 -Cleanup # Clean up artifacts

# Batch test coverage gaps
$gap_techniques = @("T1548.002", "T1055", "T1574", "T1027")
foreach ($tech in $gap_techniques) {
Write-Host "Testing $tech..."
Invoke-AtomicTest $tech -CheckPrereqs -Confirm:$false
Invoke-AtomicTest $tech -Confirm:$false
Start-Sleep 30 # Wait for SIEM to process
Write-Host "Check SIEM for $tech alert"
}

10. MITRE ATT&CK Mapping

TechniqueIDDetection MethodLog Source
Command and Scripting: PowerShellT1059.001Script block logging Event 4104Windows PowerShell log
OS Credential Dumping: DCSyncT1003.006Event 4662, replication GUIDWindows Security log
Steal Kerberos Tickets: KerberoastingT1558.003Event 4769 RC4 encryptionWindows Security log
Lateral Movement: Pass-the-HashT1550.002Event 4624 LogonType 3 NTLMWindows Security log
Scheduled Task PersistenceT1053.005Event 4698/4702Windows Security log
Exfiltration: DNS TunnelingT1048.003Long subdomain queriesDNS query logs
C2: BeaconingT1071.001Flow timing analysisZeek conn.log / firewall
Discovery: Port ScanT1046Cross-zone scan alertsFirewall / IDS
Defense Evasion: LOLBinsT1218Unsigned binary executionSysmon Event 1, auditd
Account CreationT1136.001Event 4720Windows Security log
Collection: Email HarvestingT1114Mail gateway logsExchange/O365 audit
Phishing: SpearphishingT1566.001Email security alerts + 4688Email gateway + EDR

End of Chapter 4.2 - SIEM, SOAR & Detection Engineering

Next: Chapter 4.3 - Incident Response & Digital Forensics