index=azure sourcetype="o365:management:activity" — UAL / O365 audit stream
index=msdefender sourcetype="ms:defender:atp:alerts" — Defender ATP alerts
Environment-dependent indexes (names vary by Add-on config — see Section 1 for identification queries):
idx_ms_o365_retention idx_ms_purview_insider_risk idx_ms_defender_incidents idx_ms_defender_hunting idx_ms_service_health
KPI summary: purview_summary kpi_purview_health_daily kpi_dlp_effectiveness_daily kpi_investigation_operations_daily kpi_retention_lifecycle_daily kpi_insider_risk_daily kpi_exec_control_score_monthly
Note — AlertId join: Defender alert
id values carry a "dl" prefix that must be stripped before joining to UAL AlertId. Use | eval AlertId=ltrim(id, "dl") in all Defender→Azure join queries.
Run these before building any fact tables. Confirm event count, time range, and sourcetype for each index.
index=azure (UAL/O365, sourcetype o365:management:activity) and index=msdefender (Defender ATP alerts, sourcetype ms:defender:atp:alerts). The remaining indexes — retention, insider risk, incidents, hunting, service health — depend on your Add-on configuration. Run query 1.1b to discover them, then substitute the correct names throughout.How to identify an unknown index: Run
| tstats count WHERE index=* sourcetype="<expected_sourcetype>" BY index to find which index holds a given sourcetype. For Defender incidents check sourcetype="ms:defender:incidents" or sourcetype="ms:defender:atp:incident" depending on Add-on version. For service health check sourcetype="ms:o365:management:servicehealth".
index=azure OR index=msdefender | stats count min(_time) as earliest max(_time) as latest by index, sourcetype | eval earliest=strftime(earliest,"%Y-%m-%d %H:%M") | eval latest=strftime(latest,"%Y-%m-%d %H:%M") | sort - count
idx_ms_o365_retention, idx_ms_purview_insider_risk, idx_ms_defender_incidents, idx_ms_defender_hunting, idx_ms_service_health.| tstats count WHERE index=* (sourcetype=ms:* OR sourcetype=o365:*)
BY index sourcetype
| sort - count
index=azure.index=azure sourcetype="o365:management:activity" earliest=-7d | spath RecordType | stats count by RecordType | lookup recordtype_lookup RecordType OUTPUT RecordTypeName | sort - count
index=azure with Category=DataLossPrevention scopes to DLP events within the unified audit log.index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention
earliest=-30d
| timechart span=1d count as dlp_events
idx_ms_defender_incidents with the index name identified in your environment via query 1.1b. Likely sourcetype: ms:defender:incidents or ms:defender:atp:incident. This is separate from index=msdefender (alerts). Confirm with your Add-on documentation.index=idx_ms_defender_incidents earliest=-1d
| head 5
| fields incidentId, severity, status, classification, assignedTo,
createdTime, lastUpdateTime, detectionSource, tags
idx_ms_service_health with the correct index in your environment. Typical sourcetype: ms:o365:management:servicehealth. Run query 1.1b to discover it.index=idx_ms_service_health earliest=-30d | spath | stats count by service, status, classification | sort - count
Extracts structured fields from raw DLP audit events. Source: index=azure with Category=DataLossPrevention (DLP stream) or RecordType=11 (DLPRuleMatch) in the general audit stream.
index=azure. Direct DLP audit events use the nested PolicyDetails{}.Rules{}.Actions{} structure (queries 2.1–2.4 below). DLP events correlated via a Defender alert and written back to the UAL use a flat schema with Name (Rule name) and Data (metadata JSON) as top-level fields — see query 2.5. The AlertId field is the join key between these two event classes. If you run the nested spath queries against Defender-correlated events, policy_name and rule_name will come back empty.
PolicyDetails and SensitiveInfoDetectionIsIncluded arrays. UAL index: index=azure sourcetype="o365:management:activity" Category=DataLossPrevention.index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention
| spath
| spath path=PolicyDetails{} output=policy_details_raw
| mvexpand policy_details_raw
| spath input=policy_details_raw
| rename PolicyDetails{}.PolicyName as policy_name
PolicyDetails{}.PolicyId as policy_id
PolicyDetails{}.Rules{} as rules_raw
| mvexpand rules_raw
| spath input=rules_raw
| rename Rules{}.RuleName as rule_name
Rules{}.Actions{} as action
Rules{}.OverrideDetails{} as override_details
| eval ts = strftime(_time, "%Y-%m-%d")
| eval workload = coalesce(Workload, "Unknown")
| eval user_upn = coalesce(UserId, UserKey, "Unknown")
| eval object_id = coalesce(ObjectId, SharePointMetaData.FileName, "Unknown")
| eval action = coalesce(action, "None")
| eval sensitivity = coalesce(SensitivityLabelIds, "Unlabeled")
| fields ts, _time, policy_name, policy_id, rule_name, action,
workload, user_upn, object_id, sensitivity, override_details
index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention
| spath
| spath path=PolicyDetails{}.Rules{}.ConditionsMatched.SensitiveInformation{}
output=sit_array
| mvexpand sit_array
| spath input=sit_array
| rename SensitiveType as sit_name
Count as sit_count
Confidence as sit_confidence
| stats count sum(sit_count) as total_sit_matches
by sit_name, sit_confidence, Workload
| sort - total_sit_matches
index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention
| spath
| spath path=PolicyDetails{} output=pd
| mvexpand pd
| spath input=pd
| rename PolicyDetails{}.Rules{} as rules_raw
| mvexpand rules_raw
| spath input=rules_raw
| search Rules{}.Actions{} IN ("Override", "ReportIncident")
| spath path=Rules{}.OverrideDetails{}.Justification output=justification
| stats count by Rules{}.RuleName, justification
| sort - count
index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention
earliest=-30d
| spath
| spath path=PolicyDetails{}.Rules{}.Actions{} output=action
| mvexpand action
| stats count by Workload, action
| sort - count
Name = policy rule name, Data = metadata JSON, AlertId = join key to index=msdefender. Use this query for the Defender-correlated subset; use queries 2.1–2.4 for direct Purview DLP audit events.index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention
AlertId=*
| spath
| eval ts = strftime(_time, "%Y-%m-%d")
| eval rule_name = coalesce(Name, "Unknown")
| eval metadata = coalesce(Data, "None")
| eval alert_id = AlertId
| eval workload = coalesce(Workload, "Unknown")
| eval user_upn = coalesce(UserId, "Unknown")
| eval object_id = coalesce(ObjectId, "Unknown")
| search NOT rule_name IN (*bulk*, *DocumentControls*)
| fields ts, _time, rule_name, metadata, alert_id, workload, user_upn, object_id
Source: index=azure (sourcetype="o365:management:activity") — RecordType 13, Operations: SensitivityLabelApplied, SensitivityLabelChanged, SensitivityLabelRemoved.
index=azure sourcetype="o365:management:activity"
Operation IN ("SensitivityLabelApplied","SensitivityLabelChanged","SensitivityLabelRemoved")
| spath
| eval label_before = coalesce(PreviousLabel, "None")
| eval label_after = coalesce(SensitivityLabelId, LabelName, "None")
| eval actor = coalesce(UserId, "System")
| eval source = coalesce(ApplicationDisplayName, ClientAppId, "Unknown")
| eval workload = coalesce(Workload, "Unknown")
| eval ts = strftime(_time, "%Y-%m-%d")
| fields ts, _time, Operation, actor, source, workload,
label_before, label_after, ObjectId
index=azure sourcetype="o365:management:activity"
earliest=-7d
Operation IN ("SensitivityLabelApplied","SensitivityLabelChanged","SensitivityLabelRemoved")
| stats count by Workload, Operation
| sort - count
label_rank.csv with fields label_name and rank (lower number = lower sensitivity).index=azure sourcetype="o365:management:activity"
Operation="SensitivityLabelChanged"
| spath
| eval label_before = coalesce(PreviousLabel, "None")
| eval label_after = coalesce(LabelName, SensitivityLabelId, "None")
| lookup label_rank.csv label_name AS label_before OUTPUT rank AS rank_before
| lookup label_rank.csv label_name AS label_after OUTPUT rank AS rank_after
| where rank_after < rank_before
| fields _time, UserId, label_before, label_after, rank_before, rank_after,
ObjectId, Workload
| sort - _time
index=azure sourcetype="o365:management:activity"
Operation="SensitivityLabelApplied"
| spath
| eval label_method = if(match(coalesce(ApplicationDisplayName,""), "Auto|Service|Policy"),
"auto", "manual")
| stats count by label_method
| eventstats sum(count) as total
| eval pct = round(count/total*100, 1)
| fields label_method, count, pct
Source: idx_ms_o365_retention (environment-dependent — replace with your index name) and RecordType 50 in index=azure. Operations: RetentionPolicyApplied, RecordRetention, FileLocked, RecordUnlock, HoldApplied.
index=idx_ms_o365_retention OR
(index=azure sourcetype="o365:management:activity" RecordType=50)
| spath
| eval ts = strftime(_time, "%Y-%m-%d")
| eval operation = coalesce(Operation, "Unknown")
| eval policy_name = coalesce(RetentionPolicyName, PolicyName, "Unknown")
| eval policy_id = coalesce(RetentionPolicyId, PolicyId, "Unknown")
| eval label_name = coalesce(RetentionLabelName, LabelName, "Unknown")
| eval workload = coalesce(Workload, "Unknown")
| eval object_id = coalesce(ObjectId, ItemId, "Unknown")
| eval is_record = if(match(operation, "Record|Lock"), "true", "false")
| eval hold_type = coalesce(HoldType, "None")
| fields ts, _time, operation, policy_name, policy_id, label_name,
workload, object_id, is_record, hold_type, UserId
index=idx_ms_o365_retention OR
(index=azure sourcetype="o365:management:activity" RecordType=50)
Operation IN ("FileLocked","RecordRetention","RecordCreated")
| timechart span=1d count as records_declared
index=azure sourcetype="o365:management:activity"
Operation IN ("HoldApplied","HoldChanged","HoldRemoved")
| spath
| stats count by Operation, CaseId, HoldName, HoldType
| sort - count
index=idx_ms_o365_retention OR
(index=azure sourcetype="o365:management:activity" RecordType=50)
Operation="RetentionPolicyApplied"
| stats dc(object_id) as items_covered count as events
by RetentionPolicyName, Workload
| sort - items_covered
Environment-dependent index. Replace
idx_ms_purview_insider_risk with the index name confirmed via query 1.1b. Typical sourcetype: ms:o365:management:insiderriskmanagement or events may land in index=azure with RecordType IN (306, 307, 308) depending on your Add-on version. Verify before use.
AlertId, PolicyId, PolicyName, Severity, Status, UserId.index=idx_ms_purview_insider_risk RecordType=307 | spath | eval ts = strftime(_time, "%Y-%m-%d") | eval alert_id = coalesce(AlertId, "Unknown") | eval policy_name = coalesce(PolicyName, "Unknown") | eval severity = coalesce(Severity, "Unknown") | eval status = coalesce(AlertStatus, Status, "Unknown") | eval user_upn = coalesce(UserId, "Anonymous") | fields ts, _time, alert_id, policy_name, severity, status, user_upn
CaseId, CaseName, CaseStatus, PolicyName, Severity.index=idx_ms_purview_insider_risk RecordType=308 | spath | eval ts = strftime(_time, "%Y-%m-%d") | eval case_id = coalesce(CaseId, "Unknown") | eval case_name = coalesce(CaseName, "Unknown") | eval case_status = coalesce(CaseStatus, "Unknown") | eval policy_name = coalesce(PolicyName, "Unknown") | eval severity = coalesce(Severity, "Unknown") | fields ts, _time, case_id, case_name, case_status, policy_name, severity
index=idx_ms_purview_insider_risk RecordType=306 | spath | eval ts = strftime(_time, "%Y-%m-%d") | eval activity = coalesce(Operation, ActivityType, "Unknown") | eval policy_name = coalesce(PolicyName, "Unknown") | eval risk_score = coalesce(RiskScore, 0) | eval sequence_num = coalesce(SequenceId, "None") | fields ts, _time, activity, policy_name, risk_score, sequence_num, UserId
index=idx_ms_purview_insider_risk RecordType=307 earliest=-30d | spath | stats count by PolicyName, Severity, AlertStatus | sort - count
Source: index=msdefender (sourcetype="ms:defender:atp:alerts") for confirmed alert data. Defender incident data uses a environment-dependent index — check via query 1.1b. Ingested via Splunk Add-on for Microsoft Security.
id values include a "dl" prefix (e.g., dl_abc123). The corresponding UAL field in index=azure is AlertId without the prefix. Strip it with | eval AlertId=ltrim(id, "dl") before any join to the Azure index.
idx_ms_defender_incidents with the confirmed index name from query 1.1b. Verify grouping key: the example production query groups by incidentWebUrl, not incidentId. If incidentId is null or inconsistent in your Add-on version, switch the primary key to incidentWebUrl throughout Sections 6–11.index=idx_ms_defender_incidents
| spath
| eval ts = strftime(_time, "%Y-%m-%d")
| eval incident_id = coalesce(incidentId, id, "Unknown")
| eval severity = coalesce(severity, "Unknown")
| eval status = coalesce(status, "Unknown")
| eval classification = coalesce(classification, "Unset")
| eval assigned_to = coalesce(assignedTo, "Unassigned")
| eval detection_source = coalesce(detectionSource, "Unknown")
| eval created_ts = coalesce(createdDateTime, createdTime, _time)
| eval updated_ts = coalesce(lastUpdateDateTime, lastUpdateTime, _time)
| eval age_hours = round((now() - created_ts) / 3600, 1)
| fields ts, _time, incident_id, severity, status, classification,
assigned_to, detection_source, age_hours, created_ts, updated_ts
mvexpand to get one row per entity. Alerts index: index=msdefender sourcetype="ms:defender:atp:alerts". Account identity is at evidence{}.userAccount.accountName / displayName (nested sub-object) — validate against your Add-on version; fallback field evidence{}.userPrincipalName is also extracted.index=msdefender sourcetype="ms:defender:atp:alerts"
| spath
| spath path=evidence{} output=evidence_raw
| mvexpand evidence_raw
| spath input=evidence_raw
| eval ts = strftime(_time, "%Y-%m-%d")
| eval alert_id = coalesce(alertId, id, "Unknown")
| eval incident_id = coalesce(incidentId, "Unlinked")
| eval severity = coalesce(severity, "Unknown")
| eval title = coalesce(title, "Unknown")
| eval entity_type = coalesce('evidence{}.entityType', "Unknown")
| eval account_name = coalesce('evidence{}.userAccount.accountName',
'evidence{}.userPrincipalName', "Unknown")
| eval display_name = coalesce('evidence{}.userAccount.displayName', "Unknown")
| eval device_name = coalesce('evidence{}.deviceDnsName', "Unknown")
| eval ip_address = coalesce('evidence{}.ipAddress', "Unknown")
| fields ts, _time, alert_id, incident_id, severity, title,
entity_type, account_name, display_name, device_name, ip_address
idx_ms_defender_incidents with confirmed index name.index=idx_ms_defender_incidents
status IN ("Active","InProgress","New")
| spath
| stats count by severity
| sort severity
idx_ms_defender_incidents with confirmed index name.index=idx_ms_defender_incidents
status="Resolved"
| spath
| eval created_ts = strptime(coalesce(createdDateTime, createdTime), "%Y-%m-%dT%H:%M:%SZ")
| eval resolved_ts = strptime(coalesce(lastUpdateDateTime, lastUpdateTime), "%Y-%m-%dT%H:%M:%SZ")
| eval ttl_hours = round((resolved_ts - created_ts) / 3600, 2)
| where ttl_hours > 0
| stats avg(ttl_hours) as avg_mttr_hours
median(ttl_hours) as median_mttr_hours
count as resolved_count
by severity
| sort severity
Timestamp, UserId, ActivityType, SensitivityLabel, FileSize, FileName, SiteUrl.index=idx_ms_defender_hunting sourcetype="ms:defender:hunting"
EventType="DataSecurityEvents"
| spath
| eval ts = strftime(_time, "%Y-%m-%d")
| eval activity = coalesce(ActivityType, "Unknown")
| eval label = coalesce(SensitivityLabel, "Unlabeled")
| eval user = coalesce(UserId, "Unknown")
| eval file_name = coalesce(FileName, "Unknown")
| eval site_url = coalesce(SiteUrl, "Unknown")
| fields ts, _time, activity, label, user, file_name, site_url
index=msdefender) to the O365 UAL (index=azure) via AlertId. The ltrim(id, "dl") strips the Defender-side "dl" prefix from alert IDs before the join.Policy scope filter: replace
additionalData.AlertPolicyTitle IN (YourDLPPolicy-1, YourDLPPolicy-2) with the actual DLP policy names configured in your tenant.Service account exclusion: replace the
NOT evidence{}.userAccount.displayName IN (...) list with service/system account display names from your directory.UAL fields:
Name = Rule name · Data = Metadata. Final table excludes bulk-document and document-controls rules.
index="msdefender" sourcetype="ms:defender:atp:alerts"
NOT evidence{}.userAccount.displayName IN (null, ServiceAccount-Example1, ServiceAccount-Example2)
additionalData.AlertPolicyTitle IN (YourDLPPolicy-1, YourDLPPolicy-2)
earliest=-7d@d latest=now
| eval AlertId=ltrim(id, "dl")
| stats earliest(createdDateTime) AS createdDateTime
values("evidence{}.userAccount.accountName") AS accountName
values("evidence{}.userAccount.displayName") AS displayName
values(id) AS id values(AlertId) AS AlertId
values(additionalData.AlertPolicyTitle) AS Policy
BY incidentWebUrl
| join AlertId
[search sourcetype="o365:management:activity" index=azure
earliest=-7d@d latest=now
AlertId IN (
[ search index="msdefender" sourcetype="ms:defender:atp:alerts"
additionalData.AlertPolicyTitle IN (YourDLPPolicy-1, YourDLPPolicy-2)
earliest=-7d@d latest=now
| eval AlertId=ltrim(id, "dl")
| stats values(AlertId) AS AlertId delim=", " values(Data) AS Metadata
| nomv AlertId
| rename AlertId AS search])
| stats values(Name) AS Rule values(Data) AS Metadata BY AlertId]
| search NOT Rule IN (*bulk*, *DocumentControls*)
| sort - createdDateTime
| table createdDateTime displayName accountName Policy Rule incidentWebUrl Metadata
✅ Fixed — Timestamp field:
createdDateTimeAll MTTR and incident time calculations (6.1, 6.4, 7.5, 11.2, 11.4, 14.2, 15.5) now use
coalesce(createdDateTime, createdTime) and coalesce(lastUpdateDateTime, lastUpdateTime). The coalesce fallback means queries will still work if your Add-on version uses the older field name.✅ Fixed — Account entity path:
evidence{}.userAccount.accountNameQuery 6.2 and the 7.6 fact builder now extract
evidence{}.userAccount.accountName and evidence{}.userAccount.displayName (validated against production Add-on output), with a coalesce fallback to evidence{}.userPrincipalName for other Add-on configurations.✅ Fixed — Two DLP schemas in
index=azureA dual-schema notice was added to Section 2. Query 2.5 covers Defender-correlated DLP events (flat
Name/Data fields, AlertId=*). Query 8.1 now excludes AlertId=* events to avoid empty field output when running the nested PolicyDetails{} spath against the wrong event class.⚠️ Needs live verification — Incident grouping key
The example production pattern groups
BY incidentWebUrl. Incident queries (6.1, 7.5, 8.3, 11.*) still use incidentId as the primary key because the incident index is environment-dependent and unconfirmed. Notes have been added to those queries. If incidentId returns null or inflated counts on live data, replace it with incidentWebUrl throughout.⚠️ Subsearch volume limit — monitor in production
The inner subsearch in 6.6 passes trimmed AlertIds via
| nomv AlertId | rename AlertId AS search. Splunk's default subsearch cap is 10,000 rows (limits.conf → [subsearch] maxout). At high Defender alert volume over 7 days, results truncate silently. If that occurs, either raise maxout, shorten the time window, or replace the inner subsearch with a | inputlookup generated from a prior run.
These queries normalize raw events into the seven-family fact model. Run as scheduled saved searches writing to the summary index. Each query produces one row per normalized event.
index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention
| spath
| spath path=PolicyDetails{} output=pd
| mvexpand pd
| spath input=pd
| rename PolicyDetails{}.PolicyName as policy_name
PolicyDetails{}.PolicyId as policy_id
| spath path=PolicyDetails{}.Rules{} output=rules_raw
| mvexpand rules_raw
| spath input=rules_raw
| rename Rules{}.RuleName as rule_name
| spath path=Rules{}.Actions{} output=action
| mvexpand action
| eval fact_family = "purview_dlp_fact"
| eval event_ts = _time
| eval ts = strftime(_time, "%Y-%m-%d")
| eval workload = coalesce(Workload, "Unknown")
| eval user_upn = coalesce(UserId, UserKey, "Unknown")
| eval object_id = coalesce(ObjectId, "Unknown")
| eval action_taken = coalesce(action, "None")
| eval label_id = coalesce(SensitivityLabelIds, "Unlabeled")
| eval is_override = if(match(action_taken, "Override"), "true", "false")
| fields fact_family, ts, event_ts, policy_name, policy_id, rule_name,
workload, user_upn, object_id, action_taken, label_id, is_override
index=azure sourcetype="o365:management:activity"
Operation IN ("SensitivityLabelApplied","SensitivityLabelChanged","SensitivityLabelRemoved")
| spath
| eval fact_family = "purview_label_fact"
| eval event_ts = _time
| eval ts = strftime(_time, "%Y-%m-%d")
| eval operation = Operation
| eval label_before = coalesce(PreviousLabel, "None")
| eval label_after = coalesce(LabelName, SensitivityLabelId, "None")
| eval actor = coalesce(UserId, "System")
| eval label_method = if(match(coalesce(ApplicationDisplayName,""), "Auto|Service|Policy"),
"auto", "manual")
| eval workload = coalesce(Workload, "Unknown")
| eval object_id = coalesce(ObjectId, "Unknown")
| fields fact_family, ts, event_ts, operation, label_before, label_after,
actor, label_method, workload, object_id
index=idx_ms_o365_retention OR
(index=azure sourcetype="o365:management:activity" RecordType=50)
| spath
| eval fact_family = "purview_retention_lifecycle_fact"
| eval event_ts = _time
| eval ts = strftime(_time, "%Y-%m-%d")
| eval operation = coalesce(Operation, "Unknown")
| eval policy_name = coalesce(RetentionPolicyName, PolicyName, "Unknown")
| eval label_name = coalesce(RetentionLabelName, LabelName, "Unknown")
| eval workload = coalesce(Workload, "Unknown")
| eval object_id = coalesce(ObjectId, ItemId, "Unknown")
| eval is_record = if(match(operation, "Lock|Record"), "true", "false")
| eval hold_type = coalesce(HoldType, "None")
| fields fact_family, ts, event_ts, operation, policy_name, label_name,
workload, object_id, is_record, hold_type
index=idx_ms_purview_insider_risk
RecordType IN (306, 307, 308)
| spath
| eval fact_family = "purview_insider_risk_fact"
| eval event_ts = _time
| eval ts = strftime(_time, "%Y-%m-%d")
| eval record_type = RecordType
| eval policy_name = coalesce(PolicyName, "Unknown")
| eval alert_id = coalesce(AlertId, "None")
| eval case_id = coalesce(CaseId, "None")
| eval severity = coalesce(Severity, "Unknown")
| eval status = coalesce(AlertStatus, CaseStatus, Status, "Unknown")
| eval activity = coalesce(Operation, ActivityType, "Unknown")
| eval risk_score = coalesce(RiskScore, 0)
| eval user_upn = coalesce(UserId, "Anonymous")
| fields fact_family, ts, event_ts, record_type, policy_name, alert_id,
case_id, severity, status, activity, risk_score, user_upn
incidentId is consistently populated — if not, use incidentWebUrl as the primary key instead (confirmed stable in the production alerts index — validate in your environment).index=idx_ms_defender_incidents
| spath
| eval fact_family = "defender_incident_fact"
| eval event_ts = _time
| eval ts = strftime(_time, "%Y-%m-%d")
| eval incident_id = coalesce(incidentId, id, "Unknown")
| eval severity = coalesce(severity, "Unknown")
| eval status = coalesce(status, "Unknown")
| eval classification = coalesce(classification, "Unset")
| eval assigned_to = coalesce(assignedTo, "Unassigned")
| eval detection_source = coalesce(detectionSource, "Unknown")
| eval created_ts = coalesce(createdDateTime, _time)
| eval ttl_hours = if(status="Resolved",
round((now() - strptime(coalesce(lastUpdateDateTime,""),
"%Y-%m-%dT%H:%M:%SZ")) / 3600, 2), null())
| fields fact_family, ts, event_ts, incident_id, severity, status,
classification, assigned_to, detection_source, created_ts, ttl_hours
index=msdefender sourcetype="ms:defender:atp:alerts"
| spath
| spath path=evidence{} output=evidence_raw
| mvexpand evidence_raw
| spath input=evidence_raw
| eval fact_family = "defender_alert_fact"
| eval event_ts = _time
| eval ts = strftime(_time, "%Y-%m-%d")
| eval alert_id = coalesce(alertId, id, "Unknown")
| eval AlertId = ltrim(alert_id, "dl")
| eval incident_id = coalesce(incidentId, "Unlinked")
| eval severity = coalesce(severity, "Unknown")
| eval title = coalesce(title, "Unknown")
| eval status = coalesce(status, "Unknown")
| eval detection_src = coalesce(detectionSource, serviceSource, "Unknown")
| eval account_name = coalesce('evidence{}.userAccount.accountName',
'evidence{}.userPrincipalName', "Unknown")
| eval display_name = coalesce('evidence{}.userAccount.displayName', "Unknown")
| eval policy = coalesce('additionalData.AlertPolicyTitle', "Unknown")
| stats values(account_name) as account_name
values(display_name) as display_name
values(policy) as policy
first(ts) as ts
first(event_ts) as event_ts
first(severity) as severity
first(title) as title
first(status) as status
first(detection_src) as detection_src
first(incident_id) as incident_id
BY fact_family, alert_id, AlertId
| fields fact_family, ts, event_ts, alert_id, AlertId, incident_id, severity,
title, status, detection_src, account_name, display_name, policy
Run as daily scheduled saved searches. Write normalized fact rows into purview_summary. The collect command requires the summary indexing add-on or collect capability.
collect with addtime=f and include a deterministic event_key field derived from stable event identifiers. Query the summary index with dedup event_key downstream to prevent double-counting on re-runs.
PolicyDetails{} schema for direct Purview DLP audit events. Defender-correlated DLP events (flat Name/Data schema, AlertId=*) should be collected separately — see query 2.5 for the extraction pattern.index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention
NOT AlertId=*
earliest=-1d@d latest=@d
| spath
| spath path=PolicyDetails{} output=pd
| mvexpand pd
| spath input=pd
| rename PolicyDetails{}.PolicyName as policy_name
PolicyDetails{}.PolicyId as policy_id
| spath path=PolicyDetails{}.Rules{} output=rules_raw
| mvexpand rules_raw
| spath input=rules_raw
| rename Rules{}.RuleName as rule_name
| spath path=Rules{}.Actions{} output=action
| mvexpand action
| eval fact_family = "purview_dlp_fact"
| eval ts = strftime(_time, "%Y-%m-%d")
| eval workload = coalesce(Workload, "Unknown")
| eval user_upn = coalesce(UserId, "Unknown")
| eval action_taken = coalesce(action, "None")
| eval is_override = if(match(action_taken, "Override"), "true", "false")
| eval event_key = md5(ts . policy_id . rule_name . user_upn . coalesce(ObjectId,""))
| fields fact_family, ts, event_key, policy_name, policy_id, rule_name,
workload, user_upn, action_taken, is_override
| collect index=purview_summary addtime=f source="dlp_fact_daily"
index=azure sourcetype="o365:management:activity"
earliest=-1d@d latest=@d
Operation IN ("SensitivityLabelApplied","SensitivityLabelChanged","SensitivityLabelRemoved")
| spath
| eval fact_family = "purview_label_fact"
| eval ts = strftime(_time, "%Y-%m-%d")
| eval label_before = coalesce(PreviousLabel, "None")
| eval label_after = coalesce(LabelName, SensitivityLabelId, "None")
| eval actor = coalesce(UserId, "System")
| eval label_method = if(match(coalesce(ApplicationDisplayName,""), "Auto|Service|Policy"),
"auto", "manual")
| eval event_key = md5(ts . Operation . coalesce(UserId,"") . coalesce(ObjectId,""))
| fields fact_family, ts, event_key, Operation, label_before, label_after,
actor, label_method, Workload
| collect index=purview_summary addtime=f source="label_fact_daily"
index=idx_ms_defender_incidents earliest=-1d@d latest=@d
`-- environment-dependent index: replace with confirmed name from query 1.1b --`
| spath
| eval fact_family = "defender_incident_fact"
| eval ts = strftime(_time, "%Y-%m-%d")
| eval incident_id = coalesce(incidentId, id, "Unknown")
| eval severity = coalesce(severity, "Unknown")
| eval status = coalesce(status, "Unknown")
| eval classification = coalesce(classification, "Unset")
| eval event_key = md5(ts . incident_id)
| fields fact_family, ts, event_key, incident_id, severity, status, classification,
assignedTo, detectionSource
| collect index=purview_summary addtime=f source="incident_fact_daily"
Feeds kpi_purview_health_daily. Audience: Engineering. Confirms each index is receiving data within SLA.
min_expected per index based on your environment's baseline. Alerts when an index falls below the floor.index=azure OR index=msdefender
OR index=idx_ms_o365_retention OR index=idx_ms_purview_insider_risk
OR index=idx_ms_defender_incidents OR index=idx_ms_service_health
earliest=-1d@d latest=@d
`-- Replace env-dependent index names with confirmed values from query 1.1b --`
| stats count by index
| eval min_expected = case(
index="azure", 500,
index="msdefender", 10,
index="idx_ms_o365_retention", 50,
index="idx_ms_purview_insider_risk", 1,
index="idx_ms_defender_incidents", 10,
index="idx_ms_service_health", 1,
true(), 1
)
| eval sla_met = if(count >= min_expected, "PASS", "FAIL")
| fields index, count, min_expected, sla_met
index=azure OR index=msdefender
OR index=idx_ms_o365_retention OR index=idx_ms_purview_insider_risk
OR index=idx_ms_defender_incidents
`-- Replace env-dependent index names with confirmed values from query 1.1b --`
| stats max(_time) as last_event by index
| eval lag_hours = round((now() - last_event) / 3600, 2)
| eval lag_status = case(
lag_hours <= 2, "OK",
lag_hours <= 6, "WARN",
true(), "STALE"
)
| eval last_event_fmt = strftime(last_event, "%Y-%m-%d %H:%M")
| fields index, last_event_fmt, lag_hours, lag_status
| sort - lag_hours
index=idx_ms_service_health
`-- Environment-dependent: replace with confirmed index name from query 1.1b --`
| spath
| search status IN ("ServiceDegradation","ServiceInterruption","Extended Recovery",
"Investigating","ServiceRestored")
| stats count by service, status, classification
| sort - count
Feeds kpi_dlp_effectiveness_daily. Audience: Engineering, SOC. Measures policy block rate, override rate, and top violated rules.
index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention
earliest=-7d
| spath
| spath path=PolicyDetails{} output=pd
| mvexpand pd
| spath input=pd
| rename PolicyDetails{}.PolicyName as policy_name
| spath path=PolicyDetails{}.Rules{}.Actions{} output=action
| mvexpand action
| eval action_class = case(
match(action, "BlockAccess|BlockWithOverride"), "block",
match(action, "Override"), "override",
match(action, "Notify|GenerateAlert"), "notify",
true(), "other"
)
| stats count by policy_name, action_class
| eventstats sum(count) as total_events by policy_name
| eval pct = round(count/total_events*100, 1)
| where action_class IN ("block","override")
| fields policy_name, action_class, count, pct, total_events
index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention
earliest=-30d
| spath
| spath path=PolicyDetails{} output=pd
| mvexpand pd
| spath input=pd
| rename PolicyDetails{}.PolicyName as policy_name
| spath path=PolicyDetails{}.Rules{} output=rules_raw
| mvexpand rules_raw
| spath input=rules_raw
| rename Rules{}.RuleName as rule_name
| stats count by policy_name, rule_name
| sort - count
| head 10
index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention
earliest=-30d
| spath
| spath path=PolicyDetails{}.Rules{}.Actions{} output=action
| mvexpand action
| eval action_class = case(
match(action, "BlockAccess|BlockWithOverride"), "block",
match(action, "Override"), "override",
true(), "other"
)
| where action_class IN ("block","override")
| timechart span=1d count by action_class
index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention
earliest=-7d
| spath
| stats count by UserId
| sort - count
| head 20
| rename UserId as user_upn, count as dlp_events
Feeds kpi_investigation_operations_daily. Audience: SOC. Measures open incident queue, MTTR, and unassigned incidents.
index=idx_ms_defender_incidents
`-- environment-dependent index: replace with confirmed name from query 1.1b --`
status IN ("Active","InProgress","New")
| spath
| eval assigned = if(isnotnull(assignedTo) AND assignedTo!="", "Assigned", "Unassigned")
| stats count by severity, assigned
| sort severity, assigned
index=idx_ms_defender_incidents
`-- environment-dependent index: replace with confirmed name from query 1.1b --`
status="Resolved" earliest=-90d
| spath
| eval created_ts = strptime(coalesce(createdDateTime, createdTime), "%Y-%m-%dT%H:%M:%SZ")
| eval resolved_ts = strptime(coalesce(lastUpdateDateTime, lastUpdateTime), "%Y-%m-%dT%H:%M:%SZ")
| eval ttl_hours = round((resolved_ts - created_ts) / 3600, 2)
| where ttl_hours > 0
| eval month = strftime(created_ts, "%Y-%m")
| stats avg(ttl_hours) as avg_mttr_h
median(ttl_hours) as median_mttr_h
count as resolved
by severity, month
| sort month, severity
incidentId is null or "Unlinked") may indicate orphaned signals or Add-on config gaps.index=msdefender sourcetype="ms:defender:atp:alerts"
earliest=-7d
| spath
| eval linked = if(isnotnull(incidentId) AND incidentId!="", "linked", "unlinked")
| stats count by linked
| eventstats sum(count) as total
| eval pct = round(count/total*100,1)
| fields linked, count, pct
index=idx_ms_defender_incidents earliest=-60d
`-- environment-dependent index: replace with confirmed name from query 1.1b --`
| spath
| eval created_ts = strptime(coalesce(createdDateTime, createdTime, ""), "%Y-%m-%dT%H:%M:%SZ")
| eval resolved_ts = if(status="Resolved",
strptime(coalesce(lastUpdateDateTime, lastUpdateTime, ""), "%Y-%m-%dT%H:%M:%SZ"),
null())
| eval day = strftime(created_ts, "%Y-%m-%d")
| stats count as opened by day
| sort day
| streamstats sum(opened) as cumulative_opened
Feeds kpi_retention_lifecycle_daily. Audience: Records Management, Compliance. Measures records declared, hold events, and label coverage.
index=idx_ms_o365_retention OR
(index=azure sourcetype="o365:management:activity" RecordType=50)
Operation IN ("FileLocked","RecordRetention","RecordCreated")
earliest=-30d
| timechart span=1d count by Workload
index=idx_ms_o365_retention OR
(index=azure sourcetype="o365:management:activity" RecordType=50)
earliest=-7d
| spath
| eval policy_name = coalesce(RetentionPolicyName, PolicyName, "Unknown")
| stats count by policy_name, Operation
| sort - count
index=azure sourcetype="o365:management:activity"
Operation IN ("HoldApplied","HoldChanged","HoldRemoved")
earliest=-90d
| spath
| eval month = strftime(_time, "%Y-%m")
| stats count by Operation, month
| sort month, Operation
index=azure sourcetype="o365:management:activity"
Operation IN ("RetentionPolicyApplied","SensitivityLabelApplied")
earliest=-30d
| spath
| eval label_name = coalesce(RetentionLabelName, LabelName, "Unlabeled")
| eval is_labeled = if(label_name!="Unlabeled", "labeled", "unlabeled")
| stats dc(ObjectId) as distinct_items by is_labeled
| eventstats sum(distinct_items) as total
| eval pct = round(distinct_items/total*100,1)
Feeds kpi_insider_risk_daily. Audience: Investigations Team Lead, CISO (aggregated view).
index=idx_ms_purview_insider_risk RecordType=307 earliest=-30d | spath | eval severity = coalesce(Severity, "Unknown") | timechart span=1d count by severity
index=idx_ms_purview_insider_risk RecordType=308 | spath | eval case_status = coalesce(CaseStatus, "Unknown") | stats count by case_status, PolicyName | sort - count
index=idx_ms_purview_insider_risk RecordType=306 earliest=-30d | spath | eval activity = coalesce(Operation, ActivityType, "Unknown") | eval policy_name = coalesce(PolicyName, "Unknown") | stats count by activity, policy_name | sort - count
index=idx_ms_purview_insider_risk earliest=-90d
RecordType IN (307, 308)
| spath
| eval event_type = case(
RecordType=307, "alert",
RecordType=308, "case",
true(), "other"
)
| stats count by event_type
| eventstats sum(count) as total
| eval pct = round(count/total*100,1)
| fields event_type, count, pct
Feeds kpi_exec_control_score_monthly. Audience: Leadership. Composite score across DLP block rate, label coverage proxy, incident resolution rate, and pipeline health.
index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention
earliest=@mon
| spath
| spath path=PolicyDetails{}.Rules{}.Actions{} output=action
| mvexpand action
| eval action_class = case(
match(action, "BlockAccess|BlockWithOverride"), "block",
match(action, "Override"), "override",
true(), "other"
)
| stats count by action_class
| eventstats sum(count) as total
| eval pct = round(count/total*100,1)
| where action_class="block"
| fields action_class, count, pct, total
index=idx_ms_defender_incidents earliest=@mon `-- environment-dependent index: replace with confirmed name from query 1.1b --` | spath | eval res_status = if(status="Resolved", "resolved", "open") | stats count by res_status | eventstats sum(count) as total | eval pct = round(count/total*100,1) | where res_status="resolved" | fields res_status, count, pct, total
index=azure OR index=msdefender
OR index=idx_ms_o365_retention OR index=idx_ms_purview_insider_risk
OR index=idx_ms_defender_incidents OR index=idx_ms_service_health
earliest=-24h
`-- Replace env-dependent index names with confirmed values from query 1.1b --`
| stats count by index
| eval min_expected = case(
index="azure", 500,
index="msdefender", 10,
index="idx_ms_o365_retention", 50,
index="idx_ms_purview_insider_risk", 1,
index="idx_ms_defender_incidents", 10,
index="idx_ms_service_health", 1,
true(), 1
)
| eval sla_met = if(count >= min_expected, 1, 0)
| stats sum(sla_met) as passing count as total_indexes
| eval health_score = round(passing/total_indexes*100, 0)
| fields passing, total_indexes, health_score
index=purview_summary earliest=@mon
source IN ("dlp_fact_daily","incident_fact_daily","label_fact_daily")
| stats
sum(eval(if(fact_family="purview_dlp_fact" AND action_taken="BlockAccess",1,0)))
as dlp_blocks
sum(eval(if(fact_family="purview_dlp_fact",1,0)))
as dlp_total
sum(eval(if(fact_family="defender_incident_fact" AND status="Resolved",1,0)))
as inc_resolved
sum(eval(if(fact_family="defender_incident_fact",1,0)))
as inc_total
sum(eval(if(fact_family="purview_label_fact",1,0)))
as label_events
| eval dlp_score = round(if(dlp_total>0, dlp_blocks/dlp_total*100, 0), 1)
| eval inc_score = round(if(inc_total>0, inc_resolved/inc_total*100, 0), 1)
| eval label_score = if(label_events>0, 100, 0)
| eval composite = round((dlp_score*0.4) + (inc_score*0.4) + (label_score*0.2), 1)
| eval month = strftime(now(), "%Y-%m")
| fields month, dlp_score, inc_score, label_score, composite
| collect index=purview_summary addtime=f source="exec_score_monthly"
Ready-to-paste SPL for Splunk Dashboard Studio or Classic Dashboards. Each panel query reads from KPI summary indexes for fast load times.
index=azure OR index=msdefender
OR index=idx_ms_o365_retention OR index=idx_ms_purview_insider_risk
OR index=idx_ms_defender_incidents OR index=idx_ms_service_health
earliest=-24h
`-- Replace env-dependent index names with confirmed values from query 1.1b --`
| stats count max(_time) as last_event by index
| eval lag_hours = round((now()-last_event)/3600, 1)
| eval status = case(lag_hours<=2,"✅ OK", lag_hours<=6,"⚠️ WARN", true(),"🔴 STALE")
| eval last_event = strftime(last_event,"%Y-%m-%d %H:%M")
| fields index, count, last_event, lag_hours, status
| rename index as Index count as "Events (24h)"
last_event as "Last Event" lag_hours as "Lag (hrs)" status as Status
index=purview_summary source="dlp_fact_daily" earliest=-30d
| dedup event_key
| eval action_class = case(
match(action_taken, "BlockAccess|BlockWithOverride"), "Block",
match(action_taken, "Override"), "Override",
match(action_taken, "Notify|GenerateAlert"), "Notify",
true(), "Other"
)
| timechart span=7d count by action_class
index=purview_summary source="dlp_fact_daily" earliest=-30d | dedup event_key | stats count by rule_name, policy_name | sort - count | head 5 | rename rule_name as "Rule" policy_name as "Policy" count as "Events (30d)"
index=idx_ms_defender_incidents
`-- environment-dependent index: replace with confirmed name from query 1.1b --`
status IN ("Active","InProgress","New")
| spath
| stats count by severity
| appendcols [
search index=idx_ms_defender_incidents status="Resolved" earliest=-7d
| spath
| stats count as resolved_7d ]
| fields severity, count, resolved_7d
index=idx_ms_defender_incidents
`-- environment-dependent index: replace with confirmed name from query 1.1b --`
status="Resolved" earliest=-90d
| spath
| eval created_ts = strptime(coalesce(createdDateTime, createdTime), "%Y-%m-%dT%H:%M:%SZ")
| eval resolved_ts = strptime(coalesce(lastUpdateDateTime, lastUpdateTime), "%Y-%m-%dT%H:%M:%SZ")
| eval ttl_hours = round((resolved_ts-created_ts)/3600, 2)
| where ttl_hours > 0
| eval week = strftime(created_ts, "%Y-W%W")
| stats avg(ttl_hours) as avg_mttr by week, severity
| sort week
index=purview_summary source="exec_score_monthly"
| dedup month sortby -month
| head 1
| fields month, composite, dlp_score, inc_score, label_score
| rename composite as "Composite Score (%)"
dlp_score as "DLP Score"
inc_score as "Incident Score"
label_score as "Label Score"
month as Month
index=azure sourcetype="o365:management:activity"
earliest=-90d
| spath
| eval workload = coalesce(Workload, "Unknown")
| stats count by workload
| sort - count
| head 10
index=idx_ms_o365_retention OR
(index=azure sourcetype="o365:management:activity" RecordType=50)
Operation IN ("FileLocked","RecordRetention","RecordCreated")
earliest=-90d
| timechart span=1d count as records_declared
index=idx_ms_purview_insider_risk RecordType=307 earliest=-30d | spath | eval severity = coalesce(Severity, "Unknown") | eval status = coalesce(AlertStatus, "Unknown") | stats count by severity, status | sort severity, status
index=idx_ms_purview_insider_risk RecordType=308
| spath
| eval policy_name = coalesce(PolicyName, "Unknown")
| eval case_status = coalesce(CaseStatus, "Unknown")
| eval severity = coalesce(Severity, "Unknown")
| stats count by policy_name, severity, case_status
| sort - count
| rename policy_name as Policy severity as Severity
case_status as "Case Status" count as Cases
multisearch when you need to combine events from separate indexes into one table without a union join. Requires Splunk 6.3+.| multisearch
[ search index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention earliest=-7d
| spath
| eval event_type = "DLP"
| eval key_field = coalesce(UserId,"Unknown")
| fields _time, event_type, key_field, Workload ]
[ search index=azure sourcetype="o365:management:activity" earliest=-7d
Operation IN ("SensitivityLabelApplied","SensitivityLabelChanged")
| spath
| eval event_type = "Label"
| eval key_field = coalesce(UserId,"Unknown")
| fields _time, event_type, key_field, Workload ]
| stats count by event_type, Workload
| sort - count
user_dept.csv with fields user_upn, department, team, manager. Upload via Settings → Lookups.index=azure sourcetype="o365:management:activity"
Category=DataLossPrevention
earliest=-30d
| spath
| eval user_upn = coalesce(UserId, "Unknown")
| lookup user_dept.csv user_upn OUTPUT department, team, manager
| stats count by department, team
| sort - count