Master KQL for
Microsoft Sentinel
A complete, structured handbook from beginner to SOC Hunter. Every operator, every function, every threat-hunting pattern — with real Sentinel log examples, interview questions, and practice tasks.
Tabular Data Concept
FOUNDATIONKQL (Kusto Query Language) operates on tabular data — data arranged in rows and columns,
exactly like a spreadsheet or SQL table. Every table in Microsoft Sentinel (like SigninLogs or
SecurityEvent) is a structured dataset where:
- Each row = one security event (one login, one process execution, one alert)
- Each column = a field/attribute (timestamp, username, IP address, status)
KQL queries always take a tabular input, apply transformations using operators, and return a tabular output. This pipeline model makes KQL predictable and composable.
SigninLogs might have
millions of rows (each = one Azure AD login). Your query's job is to filter those rows down to the
suspicious ones — failed logins from unfamiliar IPs, after-hours access, etc.| Key Sentinel Tables | What it Contains | Typical SOC Use |
|---|---|---|
SigninLogs |
Azure AD login events | Brute force, impossible travel |
SecurityEvent |
Windows Event Log (4624, 4625…) | Logon failures, account lockouts |
DeviceProcessEvents |
MDE process executions | Malware, LOLBins, PowerShell abuse |
DeviceNetworkEvents |
Network connections from endpoints | C2, data exfiltration |
IdentityLogonEvents |
Identity-based logon events | Lateral movement |
SecurityAlert |
Alerts generated by Sentinel rules | Triage, correlation |
AuditLogs |
Azure AD directory changes | Admin abuse, privilege escalation |
CommonSecurityLog |
CEF/Syslog from firewalls, proxies | Network security monitoring |
Query Structure
FOUNDATIONEvery KQL query follows the same pattern: start with a table, then pipe operators one after
another. The pipe character | passes the output of one operator as input to the
next.
Golden Rule: The leftmost item is always a table name. Everything after is a transformation.
TableName
| operator1 condition
| operator2 condition
| operator3 condition
Example 1 — Basic structure: all failed logins in last 24h
SecurityEvent // 1. Start: read the table
| where TimeGenerated > ago(24h) // 2. Filter: only last 24 hours
| where EventID == 4625 // 3. Filter: only failed logons
| project TimeGenerated, Account, // 4. Select: only needed columns
IpAddress, Computer
| order by TimeGenerated desc // 5. Sort: newest first
Example 2 — Count failed logins per account
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4625
| summarize FailedAttempts = count() by Account
| where FailedAttempts > 10
| order by FailedAttempts desc
Example 3 — Multi-table overview structure
SigninLogs
| where TimeGenerated > ago(7d)
| where ResultType != 0 // Non-zero = failure
| where RiskLevelDuringSignIn in ("high","medium")
| project TimeGenerated, UserPrincipalName,
IPAddress, Location, ResultDescription
| top 50 by TimeGenerated desc
| between
operators is the #1 beginner error. Each operator MUST be preceded by a pipe.securityevent will not work — it must be SecurityEvent.| where … instead of a
table name will throw a parse error.| passes the output of one operator as the input to the next. It
makes KQL composable — you can build complex queries step by step, each operator narrowing or transforming
the data. This is the foundation of all KQL queries.ago() and bin() that SQL lacks natively.SigninLogs (Azure AD logins, brute force), SecurityEvent
(Windows events, privilege abuse), DeviceProcessEvents (process execution, malware),
DeviceNetworkEvents (network connections, C2), SecurityAlert (triggered
detections, triage).- Write a KQL query that reads
SecurityEvent, filters to the last 48 hours, and shows the top 10 rows sorted by time descending. - Modify the above query to only show EventID 4624 (successful logins) and project only
TimeGenerated,Account,Computer, andIpAddress. - Write a query on
SigninLogsfor the last 7 days whereResultTypeis not 0 (failures), showingUserPrincipalName,IPAddress, andResultDescription.
Comments in KQL
FOUNDATIONKQL supports single-line comments using //. Everything after //
on that line is ignored by the query engine. There are no block comments in KQL.
Comments are critical in SOC work for: documenting detection logic, explaining thresholds, and making queries auditable in detection rule libraries.
// DETECTION: Brute Force - Windows Authentication
// Author: Abbas M | Created: 2025-01 | MITRE: T1110.001
// Threshold: >10 failures in 5 minutes = alert
SecurityEvent
| where TimeGenerated > ago(1h) // Only last hour to reduce noise
| where EventID == 4625 // Failed logon event
// Exclude service accounts and machine accounts (end with $)
| where Account !endswith "$"
| summarize FailCount = count() by Account, IpAddress
| where FailCount > 10 // Brute force threshold
Operator Pipeline
FOUNDATIONThe pipeline is KQL's core architecture. Data flows left to right, top to bottom. Each
operator receives the result of the previous operator, transforms it, and passes it forward. This means
order matters — filtering early with where dramatically improves performance
because less data flows through subsequent operators.
❌ Bad Pipeline — filters too late (scans all data first)
SecurityEvent
| project TimeGenerated, Account, EventID, IpAddress, Computer
| summarize count() by Account, EventID
| where EventID == 4625 // Filter applied AFTER aggregation — wastes resources
✅ Good Pipeline — filters early (scans minimal data)
SecurityEvent
| where TimeGenerated > ago(24h) // Time filter FIRST — hits partition pruning
| where EventID == 4625 // Event filter SECOND — reduces rows dramatically
| project TimeGenerated, Account, IpAddress, Computer
| summarize count() by Account
1. Time filter (
where TimeGenerated > ago())2. Indexed field filters (
EventID, ResultType)3. String/regex filters (expensive)
4.
project to reduce columns5.
summarize, join, or other aggregations6. Final
sort/topwhere
FILTERINGwhere filters rows based on a boolean condition. It is the most used KQL operator and the
primary tool for reducing data volume. Only rows where the condition evaluates to true pass
through to the next operator.
In SOC investigations: Every detection query starts with a where to narrow
from millions of events down to the suspicious subset. Precise where conditions = fewer false
positives.
TableName
| where <condition>
// Operators: == != > < >= <= and or not in !in has contains startswith endswith matches regex
Example 1 — Multiple conditions with AND/OR
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID in (4625, 4648, 4768, 4771) // Failed logon, pass-the-hash, Kerberos failures
| where Account != "SYSTEM" and Account != ""
| where Computer has "DC" // Only Domain Controllers
| project TimeGenerated, EventID, Account, IpAddress, Computer
Example 2 — IN operator for multiple values
DeviceProcessEvents
| where TimeGenerated > ago(48h)
| where FileName in~ ("powershell.exe","pwsh.exe","cmd.exe","wscript.exe","cscript.exe")
// in~ is case-insensitive version of in
| where ProcessCommandLine has_any ("-enc","-encoded","-nop","-bypass","IEX","Invoke-Expression")
| project TimeGenerated, DeviceName, AccountName, FileName, ProcessCommandLine
Example 3 — Negation and complex conditions
SigninLogs
| where TimeGenerated > ago(7d)
| where ResultType != 0 // Exclude successful logins
| where not(IPAddress startswith "10.") // Exclude internal IPs
| where not(IPAddress startswith "192.168.")
| where UserPrincipalName !endswith "@contoso.com" or AppDisplayName has "Legacy"
| project TimeGenerated, UserPrincipalName, IPAddress, ResultDescription, Location
== (double equals), not =. A single = is an assignment operator
(used in let or extend).== is case-sensitive. Use
=~ for case-insensitive equality. Similarly, in vs in~,
has vs has~.where TimeGenerated > ago() as the first filter. Sentinel partitions data by time, so time
filtering unlocks massive performance gains.- Alert triage:
where AlertSeverity in ("High","Critical")to focus on priority alerts - Entity filtering: Investigate a specific user with
where UserPrincipalName == "john@corp.com" - Exclusion tuning: Reduce false positives with
where Computer !in (known_safe_list) - Time-windowed hunting:
where TimeGenerated between (datetime(2025-01-01) .. datetime(2025-01-02))
has and
contains in a where clause?has matches on whole word boundaries (faster, uses index).
contains matches any substring (slower, no index). For example, has "powershell"
won't match "powershell.exe" but contains "powershell" will. In SOC queries, prefer
has for indexed performance; use contains only when substring matching is
required.in operator: where EventID in (4624, 4625, 4648, 4672).
This is more efficient than chaining multiple or conditions and is more readable.- Write a query on
SecurityEventfiltering for EventID 4625 in the last 6 hours, excluding accounts that end with$(machine accounts), and showing only events whereIpAddressis not empty. - Write a query on
SigninLogsfiltering for failed sign-ins (ResultType != 0) from outside the US (Locationdoes not contain "US") in the last 24 hours. - Write a query on
DeviceProcessEventsfor processes namedmimikatz.exeOR processes withsekurlsain their command line, in the last 7 days.
search
FILTERINGsearch is a broad, cross-column (and optionally cross-table) text search. It looks for a term
in ALL columns of a table simultaneously. It's the equivalent of Ctrl+F across your entire dataset.
When to use it: At the start of an investigation when you don't know which column an IOC (like an IP address or hash) might appear in. Do not use in production detection rules — it's slow because it scans every column.
// Single table search
SecurityEvent
| search "185.220.101.42" // Find this IP in any column
// Cross-table search (very slow — use only for initial IOC hunting)
search in (SecurityEvent, SigninLogs, DeviceNetworkEvents) "185.220.101.42"
// Search with wildcard
SecurityEvent
| search "mimik*" // Matches mimikatz, mimidrv, etc.
// Search with column scoping (faster than unconstrained search)
SecurityEvent
| where TimeGenerated > ago(1h)
| search kind=case_sensitive "PASS-THE-HASH"
Example — IOC hunting across tables
// Threat Intel IOC: Hunt for malicious domain across all log sources
search in (DeviceNetworkEvents, CommonSecurityLog, DnsEvents)
"evil-c2-domain.ru"
| where TimeGenerated > ago(30d)
| project TimeGenerated, $table, $col, $row // $table shows which table matched
search without a time filter on
a large workspace can time out. Always add where TimeGenerated > ago() before or after
search.search vs
where?search at the start of an investigation when you have an IOC (IP, hash,
domain) and don't know which column or even which table it might appear in. It's a discovery tool. Switch
to where (specifying the exact column) once you know where the data is — where
is far more performant and appropriate for detection rules.take / limit
FILTERINGtake N and limit N are identical operators — they return N arbitrary
rows from the dataset. They do NOT guarantee order (use top if you want sorted
results).
Primary use: Data exploration — quickly see the schema and sample values of an unfamiliar table.
// Explore a table you've never used before
SigninLogs | take 10
// Check what columns DeviceProcessEvents has
DeviceProcessEvents | take 5
// Limit after a complex query to cap result size
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID == 4625
| take 100 // Returns any 100 matching rows (not necessarily the latest)
take expecting the latest
events. take returns arbitrary rows — use top N by TimeGenerated desc if you
want the most recent events.distinct
FILTERINGdistinct returns unique combinations of the specified columns — equivalent to SQL's
SELECT DISTINCT. Use it to enumerate unique values in a field, which is essential during
investigation scoping.
// Which unique accounts had failed logins today?
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID == 4625
| distinct Account
// Which unique source IPs hit the firewall in the last hour?
CommonSecurityLog
| where TimeGenerated > ago(1h)
| where DeviceAction == "Deny"
| distinct SourceIP, DestinationIP, DestinationPort
// How many unique users logged into each server today?
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID == 4624
| distinct Account, Computer
| summarize UniqueUsers = count() by Computer
| order by UniqueUsers desc
distinct is used
to quickly scope — "how many unique users were affected?" or "which unique IPs sent this traffic?" before
building a full investigation query.distinct and
summarize count()?distinct returns unique rows — the actual values.
summarize dcount() returns a count of unique values. Use distinct when you want
to SEE the unique values; use summarize dcount() when you just need the number of unique
values.project
FILTERINGproject selects specific columns to include in the output (like SQL's
SELECT col1, col2). It reduces output width, improves readability, and improves performance by
carrying only needed data through the pipeline.
You can also create new computed columns inline with project.
// Basic column selection
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4625
| project TimeGenerated, Account, IpAddress, Computer, EventID
// Rename column inline with project
SigninLogs
| where TimeGenerated > ago(24h)
| project
EventTime = TimeGenerated, // rename
User = UserPrincipalName, // rename
SourceIP = IPAddress, // rename
Country = Location,
FailureReason = ResultDescription
// Create computed column inline
DeviceNetworkEvents
| where TimeGenerated > ago(24h)
| project
TimeGenerated,
DeviceName,
RemoteIP,
RemotePort,
BytesSent,
DataMB = round(toreal(BytesSent) / 1048576, 2) // Convert bytes to MB
project and
extend?project selects specific columns (others are dropped). extend
adds new columns while keeping ALL existing columns. Use project when you want to control the
final column set; use extend when you want to add computed fields without losing others.
project-away / project-rename / project-reorder
FILTERING// project-away: drop specific columns, keep everything else
// Useful when table has many columns and you want to remove noise columns
SecurityEvent
| where TimeGenerated > ago(1h)
| project-away TenantId, MG, SourceSystem, _ResourceId // Drop internal/noisy columns
// project-rename: rename columns, keep ALL others
SigninLogs
| where TimeGenerated > ago(24h)
| project-rename
SourceIP = IPAddress,
UserName = UserPrincipalName,
SignInResult = ResultType
// project-reorder: move specified columns to front, keep all others
SecurityEvent
| where TimeGenerated > ago(1h)
| project-reorder TimeGenerated, EventID, Account, Computer, *
// TimeGenerated, EventID, Account, Computer appear first, then all other columns (*)
| Operator | Keeps all cols? | Drops cols? | Renames? | Reorders? |
|---|---|---|---|---|
project |
❌ Only listed | ✅ Others dropped | ✅ Inline | ✅ Listed order |
project-away |
✅ Yes | ✅ Listed dropped | ❌ | ❌ |
project-rename |
✅ Yes | ❌ | ✅ | ❌ |
project-reorder |
✅ Yes | ❌ | ❌ | ✅ Listed to front |
sort / order by
SORTINGsort by and order by are identical — they sort the result set by one or more
columns in ascending (asc) or descending (desc) order. Default is
desc. Null values go to the end by default.
// Sort failed logins by time, newest first
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID == 4625
| sort by TimeGenerated desc
// Sort by multiple columns: most failures first, then alphabetically
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID == 4625
| summarize FailCount = count() by Account, IpAddress
| sort by FailCount desc, Account asc
// order by is identical to sort by
SigninLogs
| where TimeGenerated > ago(7d)
| order by TimeGenerated desc
top
SORTINGtop N by column is a shorthand for sort by column desc | take N. It returns the
top N rows sorted by a column. Much more efficient than sorting then taking for large datasets.
// Top 10 accounts with most failed logins
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID == 4625
| summarize FailCount = count() by Account
| top 10 by FailCount desc
// Top 5 source IPs generating the most traffic
DeviceNetworkEvents
| where TimeGenerated > ago(1h)
| summarize TotalBytes = sum(SentBytes) by RemoteIP
| top 5 by TotalBytes desc
// Top 20 most recent security alerts
SecurityAlert
| where TimeGenerated > ago(7d)
| top 20 by TimeGenerated desc
top is more efficient than
sort | take because the query engine can use a heap data structure internally instead of
fully sorting the dataset.top,
take, and sort + take?take returns arbitrary rows with no ordering guarantee.
sort + take sorts all data then takes N rows (expensive for large datasets).
top N by column efficiently returns N sorted rows using internal optimization — use this in
production queries for "most common" or "most recent" patterns.summarize
AGGREGATIONsummarize is KQL's aggregation operator — equivalent to SQL's GROUP BY. It
collapses multiple rows into grouped summaries using aggregate functions like count(),
sum(), dcount(), avg(), etc.
In SOC work: Summarize is essential for detection — counting events per user, per IP, per
hour to identify anomalies. Almost every useful detection rule uses summarize.
TableName
| summarize AggFunction(Column) by GroupingColumn1, GroupingColumn2
Example 1 — Basic count: failed logins per account
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4625
| summarize
FailedLogins = count(),
DistinctIPs = dcount(IpAddress),
FirstSeen = min(TimeGenerated),
LastSeen = max(TimeGenerated)
by Account, Computer
| where FailedLogins > 10
| order by FailedLogins desc
Example 2 — Multiple aggregations by time bucket
SecurityAlert
| where TimeGenerated > ago(7d)
| summarize
TotalAlerts = count(),
HighSeverity = countif(AlertSeverity == "High"),
MediumSeverity = countif(AlertSeverity == "Medium")
by bin(TimeGenerated, 1h), AlertName
| order by TimeGenerated desc
Example 3 — Multi-level grouping
SigninLogs
| where TimeGenerated > ago(30d)
| where ResultType == 0 // Successful only
| summarize LoginCount = count() by UserPrincipalName, tostring(LocationDetails.countryOrRegion)
| where LoginCount > 1
| order by LoginCount desc
summarize,
only the grouped-by columns and aggregated columns exist. You can't reference other columns from the
original table after a summarize without re-joining.count() counts all
rows (including duplicates). dcount() counts distinct/unique values. For "how many unique
IPs" use dcount(IpAddress), not count(IpAddress).SecurityEvent for EventID 4625 (failed logon). Use
summarize count() by Account, IpAddress, bin(TimeGenerated, 5m) to count failures per account
per IP in 5-minute windows. Filter for windows where count exceeds a threshold (e.g., >10). This catches
rapid, repeated login failures that indicate a brute force pattern.countif() and when do you use it?
countif(condition) counts only rows where the condition is true, without
needing a separate where. Example:
summarize HighAlerts = countif(Severity=="High"), LowAlerts = countif(Severity=="Low") by Source
gives counts for each severity in a single aggregation step.- Write a query to find accounts with more than 5 failed logins AND at least 1 successful login within the
same hour (potential successful brute force) using
SecurityEvent. - Using
SecurityAlert, summarize alert counts byAlertNameandAlertSeverityfor the last 7 days. Show only alert types with more than 3 occurrences. - Using
DeviceProcessEvents, find the top 10 most frequently executed process names in the last 24 hours, along with the count of unique devices running each process.
count() / dcount()
AGGREGATION| Function | Description | Use Case |
|---|---|---|
count() |
Count all rows in group | Total event count |
countif(cond) |
Count rows where condition is true | Conditional counts in one step |
dcount(col) |
Distinct count (approximate) | Unique users, unique IPs |
dcountif(col,cond) |
Distinct count with filter | Unique failed-login IPs |
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID in (4624, 4625)
| summarize
TotalEvents = count(),
FailedLogins = countif(EventID == 4625),
SuccessfulLogins = countif(EventID == 4624),
UniqueAccounts = dcount(Account),
UniqueIPs = dcount(IpAddress),
UniqueFailedIPs = dcountif(IpAddress, EventID == 4625)
by Computer
| order by FailedLogins desc
dcount() uses HyperLogLog — it's an approximation with ~2% error but
extremely fast for large datasets. For exact distinct counts, use count(distinct column)
(slower). In SOC work, the approximation is always sufficient.sum() / avg() / min() / max()
AGGREGATION// Data exfiltration detection — total bytes per destination
DeviceNetworkEvents
| where TimeGenerated > ago(24h)
| where RemotePort !in (80, 443, 53) // Exclude common ports
| summarize
TotalBytesSent = sum(SentBytes),
AvgBytesPerConn = avg(SentBytes),
MinBytes = min(SentBytes),
MaxBytes = max(SentBytes),
ConnectionCount = count()
by DeviceName, RemoteIP
| where TotalBytesSent > 104857600 // Flag if >100MB sent to single IP
| order by TotalBytesSent desc
// Response time analysis (useful for WAF/proxy logs)
CommonSecurityLog
| where TimeGenerated > ago(1h)
| summarize
AvgResponseTime = avg(toint(AdditionalExtensions)),
MaxResponseTime = max(toint(AdditionalExtensions))
by DestinationHostName
percentile() / percentiles()
AGGREGATIONpercentile(col, p) returns the p-th percentile of a numeric column.
percentiles(col, p1, p2, ...) returns multiple percentiles in one call. Essential for anomaly
detection — flagging events that fall outside the normal distribution.
// Find processes with abnormally large network connections
DeviceNetworkEvents
| where TimeGenerated > ago(7d)
| summarize
p50 = percentile(SentBytes, 50),
p95 = percentile(SentBytes, 95),
p99 = percentile(SentBytes, 99)
by InitiatingProcessFileName
| where p99 > 10000000 // Process sometimes sends >10MB in single connection
// Baseline normal login hours and flag outliers
SigninLogs
| where TimeGenerated > ago(30d)
| where ResultType == 0
| extend Hour = hourofday(TimeGenerated)
| summarize percentiles(Hour, 5, 95) by UserPrincipalName
// Shows the 5th-95th percentile login hours for each user — logins outside this = anomaly
make_list() / make_set()
AGGREGATIONmake_list(col) creates a JSON array of all values in a group (with duplicates).
make_set(col) creates a JSON array of unique values only. Both are incredibly useful in SOC
work to collect all related entities into a single row for a user or device.
// Collect all IPs a user logged in from (unique) in a session investigation
SigninLogs
| where TimeGenerated > ago(7d)
| where UserPrincipalName == "suspect@corp.com"
| summarize
UniqueIPs = make_set(IPAddress),
UniqueCountries = make_set(tostring(LocationDetails.countryOrRegion)),
AllApps = make_list(AppDisplayName),
LoginCount = count()
by UserPrincipalName
// Collect all processes run by a suspicious account
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| where AccountName == "compromised_user"
| summarize
ProcessesRun = make_set(FileName),
CommandLines = make_list(ProcessCommandLine),
DevicesTouched = make_set(DeviceName)
by AccountName
make_set() lets you see ALL unique
IPs/processes/countries associated with a user in a single investigation row — perfect for building a user
timeline or entity profile during incident response.arg_max() / arg_min()
AGGREGATIONarg_max(col, *) returns the entire row where col is at its maximum value within
each group. This is different from max(col) which only returns the maximum value. Use
arg_max to get the "most recent event" or "highest severity alert" for each entity WITH all its
original fields.
// Get the MOST RECENT login event for each user (with full row details)
SigninLogs
| where TimeGenerated > ago(7d)
| summarize arg_max(TimeGenerated, *) by UserPrincipalName
// Returns one row per user = their most recent login with ALL columns intact
// Get the highest-severity alert per device (with full alert details)
SecurityAlert
| where TimeGenerated > ago(24h)
| summarize arg_max(AlertSeverity, *) by CompromisedEntity
// Returns one row per entity = their worst alert
// Get the first process execution (earliest) per device per process name
DeviceProcessEvents
| where TimeGenerated > ago(7d)
| summarize arg_min(TimeGenerated, *) by DeviceName, FileName
// arg_min = row with MINIMUM value (earliest timestamp)
max(col) and
arg_max(col, *)?max(col) returns only the maximum VALUE of a column.
arg_max(col, *) returns the ENTIRE ROW where that column has its maximum value. Use
arg_max when you need "the latest event with all its fields" — a very common SOC requirement
for getting the current state of an entity.ago() / now()
TIMEago(timespan) returns a datetime equal to the current UTC time minus the specified duration.
now() returns the current UTC time. These are the foundation of all time-based filtering in
Sentinel.
| Timespan Unit | Abbreviation | Example |
|---|---|---|
| Minutes | m | ago(30m) |
| Hours | h | ago(6h) |
| Days | d | ago(7d) |
| Weeks | - | ago(14d) |
| Full timespan | - | ago(1h30m) |
// Last 24 hours
SecurityEvent | where TimeGenerated > ago(24h)
// Last 30 minutes (live SOC monitoring)
SecurityAlert | where TimeGenerated > ago(30m)
// Between two specific times
SigninLogs
| where TimeGenerated between (ago(7d) .. ago(1d)) // 7 days ago to 1 day ago
// Use now() for relative calculations
SecurityEvent
| where TimeGenerated > ago(1h)
| extend MinutesAgo = datetime_diff('minute', now(), TimeGenerated)
// Exact datetime range
SecurityEvent
| where TimeGenerated between (datetime(2025-01-15 08:00:00) .. datetime(2025-01-15 18:00:00))
datetime() / format_datetime()
TIME// datetime() creates a datetime literal
let incidentStart = datetime(2025-01-15 14:23:00);
let incidentEnd = datetime(2025-01-15 18:00:00);
SecurityEvent
| where TimeGenerated between (incidentStart .. incidentEnd)
// format_datetime() formats for display
SecurityEvent
| where TimeGenerated > ago(24h)
| extend FormattedTime = format_datetime(TimeGenerated, "yyyy-MM-dd HH:mm:ss")
| project FormattedTime, Account, Computer, EventID
// Extract parts of datetime
SigninLogs
| where TimeGenerated > ago(30d)
| extend
LoginHour = hourofday(TimeGenerated),
LoginDay = dayofweek(TimeGenerated), // Returns timespan (0=Sunday)
LoginMonth = monthofyear(TimeGenerated)
| summarize LoginCount = count() by LoginHour
| order by LoginHour asc
startofday() / endofday()
TIME// Get all events from today only
SecurityEvent
| where TimeGenerated >= startofday(now())
| where TimeGenerated <= endofday(now())
// Yesterday's events
SecurityEvent
| where TimeGenerated between (startofday(ago(1d)) .. endofday(ago(1d)))
// This week's alerts
SecurityAlert
| where TimeGenerated >= startofweek(now())
// Other boundary functions
// startofmonth(datetime) — first moment of a month
// startofyear(datetime) — first moment of a year
// startofhour(datetime) — truncates to the hour
// Practical: alert summary per calendar day
SecurityAlert
| where TimeGenerated > ago(30d)
| summarize DailyAlerts = count() by bin(TimeGenerated, 1d)
| extend DayLabel = format_datetime(TimeGenerated, "yyyy-MM-dd")
| project DayLabel, DailyAlerts
| order by DayLabel desc
bin()
TIMEbin(value, bucketSize) rounds a value down to the nearest multiple of bucketSize.
When applied to TimeGenerated, it groups events into time buckets (5-minute windows, hourly,
daily). This is the foundation of time-series analysis and charting in KQL.
// Failed logins per 5-minute window (brute force timeline)
SecurityEvent
| where TimeGenerated > ago(2h)
| where EventID == 4625
| summarize FailedLogins = count() by bin(TimeGenerated, 5m), Account
| order by TimeGenerated desc
// Hourly alert volume for the past week
SecurityAlert
| where TimeGenerated > ago(7d)
| summarize AlertCount = count() by bin(TimeGenerated, 1h), AlertSeverity
| render timechart
// Daily login trend
SigninLogs
| where TimeGenerated > ago(30d)
| summarize Logins = count() by bin(TimeGenerated, 1d)
| render timechart with (title="Daily Login Volume")
bin() used for in SOC queries?bin() groups events into fixed time windows, enabling time-series analysis.
In SOC work, it's used to detect spikes (brute force), create activity timelines during IR, and power
timechart visualizations in Sentinel workbooks. Without bin(), you can't plot data over time
because each event has a unique timestamp.contains / has
STRINGS| Operator | Behavior | Indexed? | Performance |
|---|---|---|---|
has "word" |
Whole-term match at word boundary | ✅ Yes | Fast |
has~ "word" |
Case-insensitive whole-term | ✅ Yes | Fast |
contains "sub" |
Substring anywhere in string | ❌ No | Slow |
contains~ "sub" |
Case-insensitive substring | ❌ No | Slow |
!has "word" |
Does NOT contain whole term | ✅ Yes | Fast |
!contains "sub" |
Does NOT contain substring | ❌ No | Slow |
// has: matches "powershell" as a whole word/term
DeviceProcessEvents
| where ProcessCommandLine has "powershell"
// Matches: "powershell -nop -enc ..." ✅
// Does NOT match: "c:\windows\system32\WindowsPowerShell\v1.0\powershell.exe" ❌ (because of path)
// contains: matches anywhere as substring
DeviceProcessEvents
| where ProcessCommandLine contains "powershell"
// Matches both of the above ✅ (but slower)
// SOC Example: find encoded commands
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| where FileName has~ "powershell"
| where ProcessCommandLine has_any ("-enc", "-EncodedCommand", "-e ")
| project TimeGenerated, DeviceName, AccountName, ProcessCommandLine
has over
contains when the term is a whole word. This is especially important in large Sentinel
workspaces where a full-text scan can be very slow. Reserve contains for cases where you
truly need mid-word substring matching.has_any / has_all
STRINGShas_any(list) returns true if the column contains ANY of the listed terms.
has_all(list) returns true if the column contains ALL listed terms. Both are indexed and fast.
// Hunt for multiple LOLBin indicators in command lines
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| where ProcessCommandLine has_any (
"certutil",
"bitsadmin",
"regsvr32",
"mshta",
"rundll32",
"wmic",
"msiexec",
"InstallUtil"
)
| where ProcessCommandLine has_any ("-urlcache", "-decode", "/transfer", "javascript:", "http")
| project TimeGenerated, DeviceName, AccountName, FileName, ProcessCommandLine
// Threat Intel: match known C2 domains
DeviceNetworkEvents
| where TimeGenerated > ago(7d)
| where RemoteUrl has_any (
"evil-domain.ru",
"malware-c2.cn",
"phishing-site.tk"
)
| project TimeGenerated, DeviceName, RemoteUrl, RemoteIP
startswith / endswith
STRINGS// Exclude machine accounts (end with $)
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4625
| where Account !endswith "$" // Exclude COMPUTERNAME$ accounts
// Find Azure AD guest accounts
SigninLogs
| where UserPrincipalName endswith "#EXT#@contoso.onmicrosoft.com"
// Find processes run from temp directories
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| where FolderPath startswith @"C:\Users\" and FolderPath contains @"\AppData\Temp"
// @ prefix = raw string (no need to escape backslashes)
// Suspicious executables starting from user-writable paths
DeviceProcessEvents
| where FolderPath startswith~ @"c:\users\" // ~= case-insensitive
| where FileName endswith ".exe"
| where not(FolderPath contains "Program Files")
matches regex
STRINGSmatches regex @"pattern" applies a regular expression match. It's the most powerful but
slowest string operator — use only when no other operator suffices. KQL uses RE2 regex syntax.
// Detect Base64-encoded PowerShell (long base64 strings)
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| where ProcessCommandLine matches regex @"-[Ee][Nn][Cc].*[A-Za-z0-9+/]{100,}"
// Matches: -enc, -Enc, -ENC followed by 100+ chars of base64
// Detect IPv4 address in a string field
CommonSecurityLog
| where Message matches regex @"\b(?:\d{1,3}\.){3}\d{1,3}\b"
// Detect suspicious file extensions in email attachments
SecurityAlert
| where ExtendedProperties matches regex @"\.(exe|dll|ps1|vbs|js|bat|cmd|hta)$"
// Detect obfuscated command: numbers interspersed with special chars
DeviceProcessEvents
| where ProcessCommandLine matches regex @"(\^|%%)[a-zA-Z](\^|%%)[a-zA-Z]"
// Matches cmd.exe obfuscation: p^o^w^e^r^s^h^e^l^l
has, where EventID ==, time range) BEFORE the regex match to minimize
the rows that need regex evaluation.substring / split / strcat
STRINGS// substring(str, startIndex, length)
SecurityEvent
| where TimeGenerated > ago(1h)
| extend DomainShort = substring(Computer, 0, 3) // First 3 chars of hostname
// split(str, delimiter) — returns a dynamic array
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| extend PathParts = split(FolderPath, "\\") // Split path on backslash
| extend ExeFolder = tostring(PathParts[-1]) // Last element = executable folder
// strcat() — concatenate strings
SigninLogs
| where TimeGenerated > ago(7d)
| extend UserAndIP = strcat(UserPrincipalName, " [", IPAddress, "]")
| project TimeGenerated, UserAndIP, ResultDescription
// Real SOC use: extract domain from UPN (user@domain.com)
SigninLogs
| where TimeGenerated > ago(7d)
| extend UPNParts = split(UserPrincipalName, "@")
| extend UPNDomain = tostring(UPNParts[1]) // domain.com
| summarize count() by UPNDomain
| order by count_ desc
replace_string() / tostring() / toupper() / tolower()
STRINGS// replace_string(str, lookup, replacement)
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| extend NormalizedCmd = replace_string(
tolower(ProcessCommandLine), // tolower() for normalization
" ", // double space
" " // replace with single space
)
// tostring() — convert any type to string
SigninLogs
| where TimeGenerated > ago(7d)
| extend RiskStr = tostring(RiskLevelDuringSignIn) // dynamic -> string
// toupper(), tolower() — case normalization for comparisons
SecurityEvent
| where TimeGenerated > ago(1h)
| where tolower(Account) !in ("system", "anonymous logon", "network service")
| project TimeGenerated, Account, EventID
Data Types Overview
TYPES| Type | Description | Example Literal | SOC Column Example |
|---|---|---|---|
string |
Text | "powershell.exe" |
Account, Computer, IPAddress |
int |
32-bit integer | 4625 |
EventID, Port |
long |
64-bit integer | 1000000L |
SentBytes, ProcessId |
real |
64-bit float | 3.14 |
Scores, percentages |
bool |
Boolean | true / false |
IsExternal, IsCompliant |
datetime |
UTC date+time | datetime(2025-01-15) |
TimeGenerated |
timespan |
Duration | 1h, 30m |
Duration fields |
dynamic |
JSON / array / bag | dynamic({"key":"val"}) |
ExtendedProperties, AdditionalData |
guid |
Unique identifier | guid("abc-...") |
CorrelationId, SessionId |
Type Conversions
TYPES// tostring() — convert dynamic/numeric to string
SigninLogs
| where TimeGenerated > ago(7d)
| extend CountryStr = tostring(LocationDetails.countryOrRegion)
| summarize count() by CountryStr
// toint() — string "4625" to integer for comparison
CommonSecurityLog
| where toint(DeviceEventClassID) >= 4000 // String field converted for numeric comparison
// tolong() — for large numbers like bytes
DeviceNetworkEvents
| where tolong(SentBytes) > 104857600 // > 100MB
// toreal() — for decimal math
DeviceNetworkEvents
| extend DataMB = toreal(SentBytes) / 1048576.0
// todatetime() — parse a string timestamp
SecurityEvent
| extend ParsedTime = todatetime(TimeGenerated)
// tobool() — convert string "true"/"false" to bool
SecurityAlert
| extend IsActive = tobool(Status == "Active")
DeviceEventClassID is stored as string,
where DeviceEventClassID > 4000 does lexicographic comparison, not numeric. Always use
toint() or tolong() first.parse_json() / dynamic type
TYPESMany Sentinel columns store JSON-encoded data (like ExtendedProperties, Entities,
AdditionalData). The dynamic type holds these JSON objects/arrays. Use
parse_json() to convert a JSON string column into a navigable dynamic object.
Access nested fields using dot notation or bracket notation.
// SigninLogs has a dynamic column: LocationDetails
SigninLogs
| where TimeGenerated > ago(7d)
| extend Country = tostring(LocationDetails.countryOrRegion)
| extend City = tostring(LocationDetails.city)
| extend Lat = toreal(LocationDetails.geoCoordinates.latitude)
| extend Lon = toreal(LocationDetails.geoCoordinates.longitude)
| project TimeGenerated, UserPrincipalName, Country, City
// SecurityAlert has ExtendedProperties as JSON string — parse it
SecurityAlert
| where TimeGenerated > ago(24h)
| extend Props = parse_json(ExtendedProperties)
| extend AttackerIP = tostring(Props.AttackerIP)
| extend TargetUser = tostring(Props.TargetUser)
| project TimeGenerated, AlertName, AttackerIP, TargetUser
// Access array elements by index
SecurityAlert
| where TimeGenerated > ago(24h)
| extend Entities = parse_json(Entities)
| extend FirstEntity = tostring(Entities[0].Address) // First entity's IP
column.property.subproperty. For
array elements, use bracket notation: column[0]. Always wrap the access in
tostring() or toint() since dynamic fields are untyped — otherwise comparisons
may fail silently.parse
PARSINGparse extracts structured fields from an unstructured text string using a pattern with named
placeholders. It's essential when log data arrives as raw text (Syslog, CEF custom fields, event messages)
and you need to extract specific values.
// Raw log message: "User admin logged in from IP 192.168.1.100 at 14:23:05"
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4624
| parse RenderedDescription with
"User " Username " logged in from IP " SourceIP " at " LoginTime
| project TimeGenerated, Username, SourceIP, LoginTime
// Parse firewall log: "src=10.0.0.5 dst=8.8.8.8 dport=443 action=ALLOW"
CommonSecurityLog
| where TimeGenerated > ago(1h)
| parse AdditionalExtensions with
"src=" SrcIP " dst=" DstIP " dport=" DstPort " action=" Action
| project TimeGenerated, SrcIP, DstIP, DstPort, Action
// parse-where: like parse but DROPS rows that don't match the pattern
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| parse-where ProcessCommandLine with * "-enc " EncodedPayload
// Only keeps rows where "-enc " exists in command line, extracting the payload
extract() / extract_all()
PARSINGextract(regex, captureGroup, str) extracts a regex capture group from a string.
extract_all(regex, str) returns all matches as a dynamic array.
// Extract IP address from a raw log message
Syslog
| where TimeGenerated > ago(1h)
| extend ExtractedIP = extract(@"(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})", 1, SyslogMessage)
| where isnotempty(ExtractedIP)
| project TimeGenerated, Computer, ExtractedIP, SyslogMessage
// Extract process name from a full path
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| extend ProcessName = extract(@"\\([^\\]+)$", 1, FolderPath)
// Captures the last segment after the final backslash
// extract_all: find all IPs in a message
Syslog
| where TimeGenerated > ago(1h)
| extend AllIPs = extract_all(@"(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})", SyslogMessage)
// Returns a dynamic array of all matching IPs in the message
// Extract event code from Windows audit message
SecurityEvent
| where TimeGenerated > ago(1h)
| extend SubStatus = extract(@"Sub Status:\s+([0-9A-Fx]+)", 1, RenderedDescription)
iff()
CONDITIONALiff(condition, trueValue, falseValue) is KQL's ternary/if-else function. Returns one value if
condition is true, another if false. Equivalent to SQL's CASE WHEN ... THEN ... ELSE ... END
for simple two-way conditionals.
// Label logins as success or failure
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID in (4624, 4625)
| extend LoginResult = iff(EventID == 4624, "Success", "Failed")
| project TimeGenerated, Account, LoginResult, IpAddress
// Flag high-risk sign-ins
SigninLogs
| where TimeGenerated > ago(7d)
| extend RiskFlag = iff(RiskLevelDuringSignIn in ("high","medium"), "⚠ RISKY", "Normal")
| project TimeGenerated, UserPrincipalName, RiskFlag, IPAddress
// Flag external vs internal IP
DeviceNetworkEvents
| where TimeGenerated > ago(24h)
| extend IPType = iff(RemoteIP startswith "10." or RemoteIP startswith "192.168.",
"Internal", "External")
| where IPType == "External"
| summarize ExternalConnections = count() by DeviceName, RemoteIP
| order by ExternalConnections desc
case()
CONDITIONALcase(cond1, val1, cond2, val2, ..., defaultVal) is the multi-branch conditional — like a
switch statement or SQL's multi-WHEN CASE. Use when you have more than two outcomes.
// Classify Windows Event IDs
SecurityEvent
| where TimeGenerated > ago(24h)
| extend EventCategory = case(
EventID == 4624, "Successful Logon",
EventID == 4625, "Failed Logon",
EventID == 4648, "Explicit Credential Logon",
EventID == 4672, "Special Privileges Assigned",
EventID == 4720, "User Account Created",
EventID == 4732, "Member Added to Security Group",
EventID == 4756, "Member Added to Universal Group",
"Other Security Event"
)
| project TimeGenerated, EventID, EventCategory, Account, Computer
| order by TimeGenerated desc
// Classify alert severity with numeric thresholds
SecurityAlert
| where TimeGenerated > ago(7d)
| extend SeverityTier = case(
AlertSeverity == "High" and IsIncident == true, "Critical - IR Required",
AlertSeverity == "High", "High - Investigate Now",
AlertSeverity == "Medium", "Medium - Review Today",
AlertSeverity == "Low", "Low - Monitor",
"Informational"
)
| summarize count() by SeverityTier
isempty() / isnotempty() / isnull() / isnotnull()
CONDITIONAL// Filter out rows with empty IP addresses
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID == 4625
| where isnotempty(IpAddress) // Exclude rows where IpAddress is "" or null
| where IpAddress != "-" // Also exclude placeholder values
// Find alerts that are missing entity information (data quality issue)
SecurityAlert
| where TimeGenerated > ago(7d)
| where isempty(CompromisedEntity)
| project TimeGenerated, AlertName, AlertSeverity
// isnull() / isnotnull() — for numeric fields and dynamic types
SecurityEvent
| where isnotnull(EventID)
| where isnull(ProcessId) // ProcessId is null (not a process event)
isempty() checks for empty string
"" OR null. isnull() checks only for null. For string columns, prefer
isnotempty(). For numeric/datetime columns, use isnotnull().mv-expand
ARRAYSmv-expand (multi-value expand) takes a dynamic array column and expands it into
multiple rows — one row per array element. It's the inverse of make_list(). Critical
for processing Sentinel's Entities arrays in SecurityAlert and similar columns.
// Expand the Entities array in SecurityAlert to get individual entity details
SecurityAlert
| where TimeGenerated > ago(24h)
| where AlertSeverity == "High"
| mv-expand parse_json(Entities)
| extend EntityType = tostring(Entities.Type)
| extend EntityAddress = tostring(Entities.Address) // For IP entities
| extend EntityName = tostring(Entities.Name) // For account entities
| where EntityType in ("ip", "account")
| project TimeGenerated, AlertName, EntityType, EntityAddress, EntityName
// Expand a make_list result for further processing
SigninLogs
| where TimeGenerated > ago(7d)
| summarize IPList = make_list(IPAddress) by UserPrincipalName
| mv-expand IPList // One row per IP per user
| extend SingleIP = tostring(IPList)
| distinct UserPrincipalName, SingleIP
bag_unpack
ARRAYSbag_unpack(column) expands a dynamic property-bag (JSON object) into individual columns — one
column per key. Use when a JSON object has variable keys you want to turn into queryable columns.
// ExtendedProperties in SecurityAlert is a JSON bag
SecurityAlert
| where TimeGenerated > ago(24h)
| extend Props = parse_json(ExtendedProperties)
| evaluate bag_unpack(Props)
// Creates new columns for each key in ExtendedProperties JSON:
// AttackerIP, TargetUser, Method, etc. (varies by alert type)
array_length()
ARRAYS// Count number of entities in each alert
SecurityAlert
| where TimeGenerated > ago(24h)
| extend EntityCount = array_length(parse_json(Entities))
| where EntityCount > 5 // Alerts with many entities may indicate broad compromise
| project TimeGenerated, AlertName, EntityCount, AlertSeverity
| order by EntityCount desc
// Check if user accessed many different resources (lateral movement indicator)
SigninLogs
| where TimeGenerated > ago(24h)
| summarize AppList = make_set(AppDisplayName) by UserPrincipalName
| extend AppCount = array_length(AppList)
| where AppCount > 10 // User accessed 10+ different apps in one day — unusual
| order by AppCount desc
Join Types Overview
JOINSKQL's join operator combines two tables based on matching key columns. KQL has 8 join kinds,
each with different behavior regarding which rows are included and how duplicates are handled.
| Join Kind | Behavior | SOC Use Case |
|---|---|---|
inner |
All matching pairs (can produce duplicates) | Correlate events where both conditions must exist |
innerunique |
One match per left row (default in KQL) | Standard correlation, no duplicates |
leftouter |
All left rows + matching right rows (null if no match) | Enrich events, keep unmatched |
rightouter |
All right rows + matching left rows | Rare — usually restructure to leftouter instead |
fullouter |
All rows from both tables | Compare two datasets for differences |
leftsemi |
Left rows that HAVE a match in right | Filter events present in a threat intel list |
leftanti |
Left rows that DON'T have a match in right | Find new/unknown IPs not in allowlist |
inner / innerunique
JOINS// Detect: accounts with failed logins THEN successful login (successful brute force)
let FailedLogins =
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4625
| summarize FailCount = count() by Account, IpAddress
| where FailCount > 5;
let SuccessfulLogins =
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4624
| distinct Account, IpAddress;
FailedLogins
| join kind=innerunique (SuccessfulLogins) on Account, IpAddress
// Returns accounts that BOTH failed 5+ times AND had a successful login
| project Account, IpAddress, FailCount
| extend Risk = "HIGH - Possible Successful Brute Force"
// Find processes that both executed AND made external connections
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| where FileName in ("powershell.exe", "cmd.exe", "wscript.exe")
| project DeviceName, InitiatingProcessId = ProcessId, FileName, AccountName
| join kind=inner (
DeviceNetworkEvents
| where TimeGenerated > ago(24h)
| where not(RemoteIP startswith "10.")
| project DeviceName, InitiatingProcessId, RemoteIP, RemotePort
) on DeviceName, InitiatingProcessId
| project DeviceName, FileName, AccountName, RemoteIP, RemotePort
leftouter / fullouter
JOINS// Enrich logins with threat intel (keep all logins, add TI match if exists)
let ThreatIntelIPs = datatable(MaliciousIP:string, ThreatType:string)
[
"185.220.101.42", "Tor Exit Node",
"91.108.56.12", "Known Scanner",
"45.33.32.156", "C2 Infrastructure"
];
SigninLogs
| where TimeGenerated > ago(24h)
| join kind=leftouter (ThreatIntelIPs) on $left.IPAddress == $right.MaliciousIP
| extend IsMalicious = isnotempty(ThreatType)
| project TimeGenerated, UserPrincipalName, IPAddress, ThreatType, IsMalicious
| order by IsMalicious desc, TimeGenerated desc
leftsemi / leftanti
JOINS// leftanti: find processes NOT in the known-safe list (NEW/UNKNOWN)
let KnownSafeProcesses = datatable(ProcessName:string)
["svchost.exe","explorer.exe","lsass.exe","csrss.exe","winlogon.exe"];
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| where FileName !in ("", " ")
| join kind=leftanti (KnownSafeProcesses) on $left.FileName == $right.ProcessName
// Returns ONLY processes NOT in safe list
| summarize count() by FileName, DeviceName
| order by count_ desc
// leftsemi: find connections TO known malicious IPs (must exist in both)
let MaliciousIPs = datatable(BadIP:string)
["185.220.101.42","91.108.56.12","45.33.32.156"];
DeviceNetworkEvents
| where TimeGenerated > ago(7d)
| join kind=leftsemi (MaliciousIPs) on $left.RemoteIP == $right.BadIP
// Returns only network events where RemoteIP is in MaliciousIPs list
| project TimeGenerated, DeviceName, RemoteIP, RemotePort, InitiatingProcessFileName
leftanti vs
leftsemi?leftsemi keeps left rows WHERE a match EXISTS in the right table — use it for
allowlist matching (is this IP in my threat intel?). leftanti keeps left rows WHERE NO match
exists — use it for baseline exclusion (show me events NOT in my known-safe list). Both are used heavily
in SOC detection rules for threat intel enrichment and noise reduction.let
ADVANCEDlet creates named variables or subquery results that can be reused later in the same query.
It's the foundation of readable, modular, and maintainable detection rules. Variables can hold scalar
values, expressions, or entire tabular results (subqueries).
// Scalar variable
let LookbackPeriod = 24h;
let BruteForceThreshold = 10;
SecurityEvent
| where TimeGenerated > ago(LookbackPeriod)
| where EventID == 4625
| summarize FailCount = count() by Account, IpAddress
| where FailCount > BruteForceThreshold
// Multi-stage detection using let
let FailedLogins =
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4625
| where Account !endswith "$"
| summarize FailCount = count(), IPs = make_set(IpAddress) by Account
| where FailCount > 10;
let AdminAccounts =
SecurityEvent
| where TimeGenerated > ago(1d)
| where EventID == 4672 // Special privileges assigned
| distinct Account;
// Combine: find admin accounts that are being brute forced
FailedLogins
| join kind=inner (AdminAccounts) on Account
| extend Risk = "CRITICAL: Admin Account Under Brute Force"
| project Account, FailCount, IPs, Risk
// Inline threat intel table using datatable
let SuspiciousProcesses = datatable(ProcessName:string, ThreatType:string)
[
"mimikatz.exe", "Credential Theft Tool",
"procdump.exe", "LSASS Dumping Tool",
"bloodhound.exe", "AD Enumeration Tool",
"sharphound.exe", "AD Enumeration Tool",
"rubeus.exe", "Kerberos Attack Tool",
"cobalt strike", "C2 Framework"
];
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| join kind=inner (SuspiciousProcesses) on $left.FileName == $right.ProcessName
| project TimeGenerated, DeviceName, AccountName, FileName, ThreatType, ProcessCommandLine
let important in Sentinel analytics
rules?let makes complex detection rules readable and maintainable. By breaking
logic into named subqueries, you can tune thresholds (one variable change affects the whole rule), reuse
datasets (compute failed logins once, use in multiple joins), and add inline threat intel tables using
datatable. Professional detection engineers use let extensively.extend
ADVANCEDextend adds new computed columns to the dataset while keeping ALL existing columns. Unlike
project (which selects), extend is purely additive.
// Add computed columns for enrichment
DeviceNetworkEvents
| where TimeGenerated > ago(24h)
| extend
DataMB = round(toreal(SentBytes) / 1048576, 2),
IsExternal = not(RemoteIP startswith "10." or RemoteIP startswith "192.168."),
ConnectionHour = hourofday(TimeGenerated),
IsAfterHours = hourofday(TimeGenerated) !between (8 .. 18)
| where IsExternal == true and DataMB > 100
| project TimeGenerated, DeviceName, RemoteIP, DataMB, IsAfterHours
// Extend with case() for risk scoring
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID == 4625
| extend RiskScore = case(
IpAddress startswith "185.", 90, // Known Tor/scanner range
IpAddress startswith "91.", 70,
Account has "admin", 80, // Admin account targeted
50 // Default score
)
| where RiskScore > 70
| order by RiskScore desc
union
ADVANCEDunion combines rows from multiple tables (stacks them vertically — like SQL's UNION ALL). Use
to query multiple log sources simultaneously. The special column $table shows which table each
row came from.
// Combine Windows + Azure AD failed logins into one view
union
(SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID == 4625
| project TimeGenerated, Account, IpAddress, Source="Windows AD"),
(SigninLogs
| where TimeGenerated > ago(24h)
| where ResultType != 0
| project TimeGenerated, Account=UserPrincipalName, IpAddress=IPAddress, Source="Azure AD")
| order by TimeGenerated desc
// Union with wildcard (all tables matching pattern)
union SecurityEvent, SecurityAlert, SecurityIncident
| where TimeGenerated > ago(1h)
| summarize count() by $table // $table tells you which source table
lookup
ADVANCEDlookup is an optimized left outer join specifically designed for enrichment — adding columns
from a small reference table (like a threat intel list or asset inventory) to a large event table. It's
faster than join kind=leftouter for this pattern.
// Asset inventory lookup
let AssetInventory = datatable(DeviceName:string, Department:string, Owner:string, Criticality:string)
[
"DC01", "IT Infrastructure", "John Smith", "Critical",
"FILESERVER01","Finance", "Jane Doe", "High",
"DEVLAPTOP01", "Engineering", "Bob Lee", "Medium"
];
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| where FileName has_any ("mimikatz", "procdump", "bloodhound")
| lookup AssetInventory on DeviceName
| project TimeGenerated, DeviceName, Department, Owner, Criticality, FileName, AccountName
| order by Criticality asc // Critical devices first
materialize()
ADVANCEDmaterialize() caches a subquery result in memory so it can be referenced multiple times
without re-execution. Critical for complex queries that use the same dataset in multiple joins or
comparisons — prevents redundant table scans.
// Without materialize: FailedLogins computed TWICE
let FailedLogins =
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4625
| summarize count() by Account;
// With materialize: computed ONCE, reused
let FailedLogins = materialize(
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4625
| summarize FailCount = count() by Account
);
// Now FailedLogins is computed once and reused in both joins below
FailedLogins
| join kind=inner (FailedLogins | where FailCount > 20 | project Account) on Account
// Complex analysis using same dataset multiple times without re-scanning
let
subquery is referenced more than once in the same query (in multiple joins, in a self-join, or compared
against itself). Without it, KQL re-executes the subquery each time it's referenced.render
VISUALIZATIONrender is always the LAST operator in a query. It transforms the tabular results into a
chart/visualization in Azure Data Explorer, Log Analytics, and Sentinel. It doesn't change the data — only
the presentation. Essential for Sentinel Workbooks and dashboards.
| render [chartType] with (
title = "Chart Title",
xtitle = "X-Axis Label",
ytitle = "Y-Axis Label",
legend = visible
)
timechart
VISUALIZATIONtimechart creates a time-series line chart. Requires a datetime column and numeric column(s).
The datetime must already be grouped with bin().
// Failed logins over time — spike detection
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID == 4625
| summarize FailedLogins = count() by bin(TimeGenerated, 15m)
| render timechart with (title="Failed Logins per 15 Min", ytitle="Count")
// Multi-series: compare alert volumes by severity
SecurityAlert
| where TimeGenerated > ago(7d)
| summarize AlertCount = count() by bin(TimeGenerated, 1h), AlertSeverity
| render timechart with (title="Alert Volume by Severity")
// Each unique AlertSeverity value becomes its own line on the chart
// Network traffic anomaly detection
DeviceNetworkEvents
| where TimeGenerated > ago(24h)
| summarize TotalMB = sum(tolong(SentBytes)) / 1048576 by bin(TimeGenerated, 1h)
| render timechart with (title="Hourly Outbound Data (MB)")
barchart / piechart / scatterchart
VISUALIZATION// barchart — top countries with failed logins
SigninLogs
| where TimeGenerated > ago(7d)
| where ResultType != 0
| summarize FailCount = count() by tostring(LocationDetails.countryOrRegion)
| top 10 by FailCount desc
| render barchart with (title="Failed Logins by Country", ytitle="Count")
// piechart — alert breakdown by severity
SecurityAlert
| where TimeGenerated > ago(30d)
| summarize Count = count() by AlertSeverity
| render piechart with (title="Alert Distribution by Severity")
// scatterchart — bytes vs connections (outlier detection)
DeviceNetworkEvents
| where TimeGenerated > ago(24h)
| summarize TotalBytes = sum(tolong(SentBytes)), Connections = count() by RemoteIP
| render scatterchart with (title="Bytes vs Connections per IP",
xtitle="Connection Count", ytitle="Total Bytes")
// columnchart — process execution counts
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| summarize RunCount = count() by FileName
| top 15 by RunCount desc
| render columnchart
•
timechart — trends over time (spike detection, baselines)•
barchart / columnchart — comparing categories•
piechart — distribution/proportions (use sparingly)•
scatterchart — correlation between two metrics (outlier detection)Brute Force Detection
THREAT HUNT// DETECTION: Windows Authentication Brute Force
// MITRE: T1110.001 | Severity: High | Window: 5m
let BruteForceThreshold = 10;
let TimeWindow = 5m;
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4625 // Failed logon
| where Account !endswith "$" // Exclude machine accounts
| where IpAddress != "-" and isnotempty(IpAddress)
| where Account !in ("ANONYMOUS LOGON","NETWORK SERVICE","LOCAL SERVICE")
| summarize
FailedAttempts = count(),
DistinctIPs = dcount(IpAddress),
TargetAccounts = make_set(Account),
FirstAttempt = min(TimeGenerated),
LastAttempt = max(TimeGenerated)
by IpAddress, bin(TimeGenerated, TimeWindow)
| where FailedAttempts >= BruteForceThreshold
| extend AttackDuration = LastAttempt - FirstAttempt
| order by FailedAttempts desc
// DETECTION: Azure AD Password Spray / Brute Force
SigninLogs
| where TimeGenerated > ago(1h)
| where ResultType != 0 // Failed sign-ins
| where ResultType != 50053 // Exclude locked accounts
| where ResultType != 50055 // Exclude expired passwords
| summarize
FailCount = count(),
UniqueUsers = dcount(UserPrincipalName),
UsersTargeted = make_set(UserPrincipalName),
FirstSeen = min(TimeGenerated),
LastSeen = max(TimeGenerated)
by IPAddress, bin(TimeGenerated, 10m)
| extend
IsSpray = UniqueUsers > 5, // Many users from same IP = password spray
IsBrute = UniqueUsers == 1 and FailCount > 20 // One user many times = brute force
| where IsSpray or IsBrute
| extend AttackType = iff(IsSpray, "Password Spray", "Brute Force")
| project FirstSeen, LastSeen, IPAddress, AttackType, UniqueUsers, FailCount, UsersTargeted
Impossible Travel Detection
THREAT HUNT// DETECTION: Impossible Travel — logins from different countries within short window
// MITRE: T1078 | Severity: High
SigninLogs
| where TimeGenerated > ago(24h)
| where ResultType == 0 // Successful logins only
| where isnotempty(IPAddress)
| extend Country = tostring(LocationDetails.countryOrRegion)
| where isnotempty(Country)
| summarize
Countries = make_set(Country),
IPs = make_set(IPAddress),
LoginCount = count(),
FirstLogin = min(TimeGenerated),
LastLogin = max(TimeGenerated)
by UserPrincipalName
| extend CountryCount = array_length(Countries)
| where CountryCount > 1 // User logged in from 2+ countries
| extend TimeDiff = LastLogin - FirstLogin
| where TimeDiff < 4h // Within 4 hours — physically impossible
| extend
RiskFlag = "IMPOSSIBLE TRAVEL DETECTED",
TravelWindow = format_timespan(TimeDiff, "hh:mm")
| project UserPrincipalName, Countries, IPs, LoginCount, FirstLogin, LastLogin, TravelWindow, RiskFlag
| order by CountryCount desc
PowerShell Abuse Detection
THREAT HUNT// DETECTION: Suspicious PowerShell Execution
// MITRE: T1059.001 | Severity: High
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| where FileName in~ ("powershell.exe", "pwsh.exe", "powershell_ise.exe")
| where ProcessCommandLine has_any (
"-enc", "-EncodedCommand", // Base64 encoded commands
"-nop", "-NoProfile", // Bypass profile
"-exec bypass", "-ExecutionPolicy Bypass", // Bypass execution policy
"IEX", "Invoke-Expression", // Dynamic code execution
"Invoke-WebRequest", "wget", "curl", // Download
"DownloadString", "DownloadFile", // Download methods
"Net.WebClient", // Web client usage
"FromBase64String", // Base64 decode
"System.Reflection.Assembly", // Reflective loading
"Add-Type" // Inline compilation
)
| where not(InitiatingProcessFileName in~ ("svchost.exe","services.exe","wuauclt.exe"))
| extend
HasEncoding = ProcessCommandLine has_any ("-enc","-EncodedCommand","FromBase64String"),
HasDownload = ProcessCommandLine has_any ("WebClient","WebRequest","DownloadString"),
HasExecution = ProcessCommandLine has_any ("IEX","Invoke-Expression"),
RiskScore = toint(0)
| extend RiskScore = RiskScore
+ iff(HasEncoding, 30, 0)
+ iff(HasDownload, 30, 0)
+ iff(HasEncoding and HasDownload, 20, 0) // Extra points if both
+ iff(HasExecution, 20, 0)
| where RiskScore > 30
| project TimeGenerated, DeviceName, AccountName, FileName,
ProcessCommandLine, RiskScore, HasEncoding, HasDownload, HasExecution
| order by RiskScore desc, TimeGenerated desc
Privilege Escalation Detection
THREAT HUNT// DETECTION: Account added to privileged group
// MITRE: T1078.002 | EventID 4732/4756
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID in (4732, 4756, 4728) // Added to security/universal/global group
| parse EventData with * '"TargetUserName">' TargetUser '<' *
| parse EventData with * '"MemberName">' AddedMember '<' *
| where TargetUser in~ (
"Administrators","Domain Admins","Enterprise Admins",
"Schema Admins","Account Operators","Backup Operators"
)
| project TimeGenerated, EventID, Computer, Account as AddedBy,
AddedMember, TargetUser as PrivilegedGroup
// DETECTION: Suspicious use of special privileges (EventID 4672)
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4672 // Special privileges assigned
| where Account !in ("SYSTEM","ANONYMOUS LOGON","LOCAL SERVICE")
| where Account !endswith "$" // Exclude machine accounts
| where PrivilegeList has_any (
"SeDebugPrivilege", // Process memory access (mimikatz)
"SeImpersonatePrivilege", // Token impersonation
"SeTakeOwnershipPrivilege", // Take ownership of objects
"SeLoadDriverPrivilege" // Load kernel drivers
)
| project TimeGenerated, Account, Computer, PrivilegeList
Suspicious Process Execution
THREAT HUNT// DETECTION: Living-off-the-Land Binaries (LOLBins) abuse
// MITRE: T1218 | Severity: Medium-High
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| extend SuspiciousActivity = case(
// certutil used for download/decode
FileName == "certutil.exe" and ProcessCommandLine has_any ("-urlcache","-decode","-encode"),
"CertUtil Abuse - Possible Download/Decode",
// regsvr32 running scripts
FileName == "regsvr32.exe" and ProcessCommandLine has_any ("scrobj","http","ftp"),
"Regsvr32 Squiblydoo - Script Execution",
// mshta executing remote content
FileName == "mshta.exe" and ProcessCommandLine has_any ("http","vbscript","javascript"),
"MSHTA Remote Code Execution",
// bitsadmin for download
FileName == "bitsadmin.exe" and ProcessCommandLine has "/transfer",
"BITSAdmin Download",
// wmic for lateral movement
FileName == "wmic.exe" and ProcessCommandLine has_any ("process call create","shadowcopy delete"),
"WMIC Suspicious Command",
// msiexec running from unusual location
FileName == "msiexec.exe" and ProcessCommandLine has_any ("http","ftp","\\\\"),
"MSIExec Remote Payload",
""
)
| where isnotempty(SuspiciousActivity)
| project TimeGenerated, DeviceName, AccountName, FileName,
ProcessCommandLine, SuspiciousActivity, InitiatingProcessFileName
| order by TimeGenerated desc
// DETECTION: LSASS memory access (credential dumping)
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| where InitiatingProcessFileName !in~
("MsMpEng.exe","svchost.exe","csrss.exe","werfault.exe","taskmgr.exe")
| join kind=inner (
DeviceProcessEvents
| where TimeGenerated > ago(24h)
| where FileName == "lsass.exe"
| project DeviceName, LsassPID = ProcessId
) on DeviceName
// Cross-reference: what processes accessed LSASS?
| project TimeGenerated, DeviceName, InitiatingProcessFileName, AccountName
Lateral Movement Detection
THREAT HUNT// DETECTION: Pass-the-Hash / Lateral Movement (EventID 4624 Type 3 = Network Logon)
// MITRE: T1550.002
SecurityEvent
| where TimeGenerated > ago(24h)
| where EventID == 4624
| where LogonType == 3 // Network logon (common in lateral movement)
| where AuthenticationPackageName has "NTLM" // NTLM = no Kerberos = suspicious
| where Account !endswith "$"
| where WorkstationName != Computer // Source != destination
| where not(Account has "ANONYMOUS LOGON")
| summarize
TargetMachines = make_set(Computer),
SourceMachines = make_set(WorkstationName),
HopCount = dcount(Computer),
FirstSeen = min(TimeGenerated),
LastSeen = max(TimeGenerated)
by Account, IpAddress
| where HopCount > 2 // Account accessed 3+ unique machines
| extend LateralMovementIndicator = "NTLM Network Logon Across Multiple Hosts"
| order by HopCount desc
// DETECTION: WinRM / Remote PowerShell usage (EventID 4624 Type 3 on WinRM)
IdentityLogonEvents
| where TimeGenerated > ago(24h)
| where Protocol == "WinRM"
| where ActionType == "LogonSuccess"
| summarize
Destinations = make_set(TargetDeviceName),
HopCount = dcount(TargetDeviceName),
LoginCount = count()
by AccountName, DeviceName
| where HopCount > 1
| order by HopCount desc
Data Exfiltration Detection
THREAT HUNT// DETECTION: Anomalous outbound data volume
// MITRE: T1041 | Severity: High
let ExfilThresholdMB = 500; // Alert if >500MB sent to single external IP
DeviceNetworkEvents
| where TimeGenerated > ago(24h)
| where not(RemoteIP startswith "10."
or RemoteIP startswith "172.16."
or RemoteIP startswith "192.168."
or RemoteIP == "127.0.0.1")
| where RemotePort !in (80, 443) // Non-standard port = more suspicious
| summarize
TotalBytesSent = sum(tolong(SentBytes)),
TotalBytesRecv = sum(tolong(ReceivedBytes)),
ConnectionCount = count(),
TimeRange = make_list(TimeGenerated)
by DeviceName, AccountName, RemoteIP, RemotePort
| extend TotalMB = round(toreal(TotalBytesSent) / 1048576, 2)
| where TotalMB > ExfilThresholdMB
| extend ExfiltrationRisk = case(
TotalMB > 5000, "CRITICAL - >5GB Exfiltrated",
TotalMB > 1000, "HIGH - >1GB Exfiltrated",
TotalMB > 500, "MEDIUM - >500MB Exfiltrated",
"Monitor"
)
| project DeviceName, AccountName, RemoteIP, RemotePort, TotalMB,
ConnectionCount, ExfiltrationRisk
| order by TotalMB desc
// DETECTION: DNS exfiltration (large number of unique DNS subdomains)
// Data exfiltrated encoded in DNS queries
DnsEvents
| where TimeGenerated > ago(24h)
| where isnotempty(Name)
| extend Domain = tostring(split(Name, ".")[-2]) + "." + tostring(split(Name, ".")[-1])
| summarize
SubdomainCount = dcount(Name),
QueryCount = count(),
UniqueQueryLen = dcount(strlen(Name))
by ClientIP, Domain
| where SubdomainCount > 100 // Many unique subdomains = possible DNS tunneling
| where strlen(Domain) > 0
| extend DNSTunnelingIndicator = "High unique subdomain count - possible DNS tunneling"
| order by SubdomainCount desc
KQL Learning Roadmap
ROADMAPFollow this progression to go from complete beginner to Detection Engineer. Each level builds on the previous.
▸ Pipeline concept
▸ where, take, limit
▸ project, sort
▸ top, distinct
▸ ago(), now()
▸ count(), summarize
▸ Basic string ops
Goal: Run your first SOC query
▸ bin() & timechart
▸ let variables
▸ extend, iff(), case()
▸ parse, extract()
▸ parse_json(), dynamic
▸ make_list, make_set
▸ arg_max, arg_min
Goal: Write detection rules
▸ union, lookup
▸ mv-expand, bag_unpack
▸ regex mastery
▸ materialize()
▸ render & workbooks
▸ datatable inline
▸ Cross-table queries
Goal: Complex multi-source correlation
▸ Brute force detection
▸ Impossible travel
▸ PowerShell abuse
▸ Lateral movement
▸ Data exfiltration
▸ Threat intel joins
▸ Anomaly hunting
Goal: Proactive threat hunting
▸ UEBA / ML functions
▸ series_decompose
▸ Workbook authoring
▸ Custom parsers (ASIM)
▸ Hunting notebooks
▸ SOAR integration
▸ SC-200 certification
Goal: Build the detection platform
SecurityAlert | where AlertName has "Brute" to find the alert
details. 2. Extract the attacker IP from ExtendedProperties using parse_json().
3. Query SecurityEvent for EventID 4625 filtered to that IP to see the scope. 4. Check
SecurityEvent for EventID 4624 from the same IP to see if any login succeeded. 5. If
successful, query SigninLogs for that account's subsequent activity. 6. Use
make_set() to collect all accessed resources, then pivot to those resources.join kind=leftanti and join kind=leftsemi?leftsemi keeps left rows that HAVE a match in the right table (use for: "show
me events from known-bad IPs"). leftanti keeps left rows that DON'T have a match (use for:
"show me processes not in my allowlist"). Both return only left table columns — they're purely filter
operations.where TimeGenerated > ago() FIRST — enables time partition pruning. 2.
Use has instead of contains (indexed). 3. Filter on indexed fields
(EventID, Computer) before string operations. 4. Use project early
to reduce columns flowing through the pipeline. 5. Replace where X or Y or Z with
where X in (list). 6. Use materialize() if same subquery is referenced multiple
times. 7. Avoid using search in production rules.SigninLogs for successful logins. Use
summarize make_set(country) by user to collect unique countries per user. Use
array_length() to find users with >1 country. Use min(TimeGenerated) and
max(TimeGenerated) to calculate the time window, then filter where the window is shorter than
physically possible travel time (e.g., <4 hours).arg_max(TimeGenerated, *) and when
you use it.arg_max(TimeGenerated, *) within a summarize returns the entire
row where TimeGenerated is at its maximum value for each group. The * means
"keep all other columns from that row." Use it to get "the most recent event for each user/device/IP" with
all original fields intact — essential for getting the current state of an entity without losing context.
bin() and why is it essential for SOC
dashboards?bin(TimeGenerated, interval) rounds timestamps down to fixed-size buckets.
Without it, every event has a unique timestamp so you can't group by time. With it, you can count events
per 5-minute window, per hour, per day — enabling trend analysis, spike detection, and time-series charts
(render timechart). It's the core of every Sentinel workbook time visualization.- Full Brute Force Investigation: Write a complete query pipeline that (1) detects
accounts with >5 failed logins in 15 minutes, (2) enriches with whether any of those accounts had a
subsequent successful login, (3) for successful post-brute-force logins, pivots to show what resources the
account accessed using
SigninLogs, usingletfor each stage. - Multi-source Correlation: Write a query using
unionto combineSecurityEvent(EventID 4625) andSigninLogs(ResultType != 0) failed logins, normalize the columns (Account/UPN, IP, Time, Source), and then build a unified brute force report with counts per attacker IP across both sources. - Detection Rule with Threat Intel: Create a complete detection rule using
letto define: (a) a datatable of 5 known malicious IPs, (b) a query ofDeviceNetworkEventsfor the last 24h, (c) ajoin kind=leftsemito filter connections to those IPs, (d) enrichment of the results with device info usingextend, and (e) arender timechartof matches per hour. Add comments documenting MITRE technique, author, and threshold.