WPNinjas HeaderWPNinjas Header

Microsoft Sentinel ASIM Parser demystified

In the realm of cybersecurity, the ability to efficiently parse and analyze vast amounts of data is crucial. Microsoft Sentinel offers a powerful solution for this purpose. One of its key features is the ASIM (Advanced Security Information Model) parser, which plays a significant role in data normalization and enrichment.

But when reading the docs and especially when you would like to write your own parser it looks complicated. Thats why I created this blog and show how to create your custom ASIM parser.

Table of Contents

Understanding the ASIM Parser Model

In the documentation we can find the following architectural diagram which highlights the inner workings, but in my opinion it’s not showing the important part, how it really workes.

At the end ASIM is a more or less nothing complicated and just a brilliant way in leveraging KQL functions as abstraction layer. There are predefined schemas for the following datatypes which cover the most important use cases:

  • Audit event parsers
  • Authentication parsers
  • DNS parsers
  • File Activity parsers
  • Network Session parsers
  • Process Event parsers
  • Registry Event parsers
  • Web Session parsers
In general, all parsers should have two versions, one which does not support filtering parameters (ASim) and another which supports parameters (Im) to filter results already at the beginning and therefore helps to improve performance.

In the following diagram I show the relationships of the different functions:

As you can see the _Im_NetworkSession function references three other functions and is defining isfuzzy=true is ignoring when one of the referenced tables or function is not available which is by default the case for the Im_NetworkSessionCustom and Im_NetworkSessionSolutions. The Custom we will create as soon we build our custom parser.

union isfuzzy=true
_Im_NetworkSessionBuiltIn(starttime= starttime, endtime= endtime, srcipaddr_has_any_prefix= srcipaddr_has_any_prefix, dstipaddr_has_any_prefix= dstipaddr_has_any_prefix, ipaddr_has_any_prefix= ipaddr_has_any_prefix, dstportnumber= dstportnumber, hostname_has_any= hostname_has_any, dvcaction= dvcaction, eventresult= eventresult, pack= pack),
Im_NetworkSessionSolutions(starttime= starttime, endtime= endtime, srcipaddr_has_any_prefix= srcipaddr_has_any_prefix, dstipaddr_has_any_prefix= dstipaddr_has_any_prefix, ipaddr_has_any_prefix= ipaddr_has_any_prefix, dstportnumber= dstportnumber, hostname_has_any= hostname_has_any, dvcaction= dvcaction, eventresult= eventresult, pack= pack),
Im_NetworkSessionCustom(starttime= starttime, endtime= endtime, srcipaddr_has_any_prefix= srcipaddr_has_any_prefix, dstipaddr_has_any_prefix= dstipaddr_has_any_prefix, ipaddr_has_any_prefix= ipaddr_has_any_prefix, dstportnumber= dstportnumber, hostname_has_any= hostname_has_any, dvcaction= dvcaction, eventresult= eventresult, pack= pack)
In the Im_NetworkSessionCustom you can then reference your custom parsers which you should always name according to the naming convention:
  • Im_%Schema%_%Vendor%%Product%%TwoDigitVersionNumber%
  • ASim_%Schema%_%Vendor%%Product%%TwoDigitVersionNumber%

Normalize at Ingestion-Time vs Parsers

Parsing during query execution can be resource intensive and decrease performance, but you have stored the logs as they are provided by the vendor, avoided potential duplicates and easier development. Both options have their advantages which are highlighted well in Microsoft Docs.

As you can see tables already exist but do not contain data and are therefore hidden if you do not enable to show empty tables. 

I have not chosen to use this variant as I think it would not be beneficial and I could lose some data/fields I potentially will use at a later point in time.

Example Source-specific parsers based on Ubiquiti Dreambox Pro

So, when creating a source-specific custom parser we first need to create the two functions. I’m using the raw table in which I ingest the log data. So, check out my previous blogs about how to ingest Ubiquiti Dreammachine Pro including the Suricata Logs into Sentinel. At the end the two required functions are simple and mainly consist of parsing and transforming field names or values according to the schema. 

First I created the Im_NetworkSession_UbiquitiDmPro01 function:

let parser = (
  starttime:datetime=datetime(null),
  endtime:datetime=datetime(null),
  srcipaddr_has_any_prefix:dynamic=dynamic([]),
  dstipaddr_has_any_prefix:dynamic=dynamic([]),
  ipaddr_has_any_prefix:dynamic=dynamic([]),
  dstportnumber:int=int(null), 
  hostname_has_any:dynamic=dynamic([]), // not available from source
  dvcaction:dynamic=dynamic([]),
  eventresult:string='*',
  disabled:bool=false
) 
{
let ActionTranslation=datatable(DvcAction:string,StandardisedAction:string,StandardizedResult:string,Severity:string)[
      'Accepted','Allow','Success','Informational',
      'Denied','Deny','Failure','Low',
      'Other','Allow','Success','Informational'
   ];
let src_or_any=set_union(srcipaddr_has_any_prefix, ipaddr_has_any_prefix); 
let dst_or_any=set_union(dstipaddr_has_any_prefix, ipaddr_has_any_prefix); 
let SuricataAlerts =() { 
    Ubiquiti_CL 
    | where not(disabled) // required for ASIM parser
    | where
       (isnull(starttime) or TimeGenerated >= starttime)
        and  (isnull(endtime) or TimeGenerated <= endtime)
    | where facility == 21 // filter for Suricata events
    | extend MessageJson= trim_start(@"DreamMachinePro suricata\[[0-9]*\]: ",message)
    | extend // Parse Message
        SrcMacAddr = tostring(parse_json(MessageJson).src_mac),
        SrcIpAddr = tostring(parse_json(MessageJson).src_ip),
        SrcPortNumber = tostring(parse_json(MessageJson).src_port),
        DstIpAddr = tostring(parse_json(MessageJson).dest_ip),
        DstMacAddr = tostring(parse_json(MessageJson).dest_mac),
        DstPortNumber = tostring(parse_json(MessageJson).dest_port),
        FlowId = tostring(parse_json(MessageJson).flow_id),
        NetworkProtocol = tostring(parse_json(MessageJson).proto),
        DvcInboundInterface = tostring(parse_json(MessageJson).in_iface),
        EventCategory = tostring(parse_json(MessageJson).event_type),
        Service  = tostring(parse_json(MessageJson).app_protocol),
        DvcAction  = tostring(parse_json(MessageJson).alert.action),
        AlertMetadata = tostring(parse_json(MessageJson).alert.metadata),
        AlertSignature = tostring(parse_json(MessageJson).alert.signature),        
        AlertCategory = tostring(parse_json(MessageJson).alert.category)
    | where (isnull(dstportnumber) or DstPortNumber==dstportnumber)
    | extend temp_SrcMatch=has_any_ipv4_prefix(SrcIpAddr,src_or_any)
         , temp_DstMatch=has_any_ipv4_prefix(DstIpAddr,dst_or_any)
    | extend ASimMatchingIpAddr=case(
         array_length(src_or_any) == 0 and array_length(dst_or_any) == 0 ,"-",
         temp_SrcMatch and temp_DstMatch, "Both",
         temp_SrcMatch, "SrcIpAddr",
         temp_DstMatch, "DstIpAddr",
         "No match"
       )
    | where ASimMatchingIpAddr != "No match" 
    | project-away temp_*
    | lookup ActionTranslation on DvcAction // lookup for Schema defined Action Names
    | extend // Build required and optional Schema fields
        EventStartTime = TimeGenerated,
        EventEndTime = TimeGenerated,
        EventSchema = "NetworkSession",
        EventVendor = "Ubiquiti",
        EventSchemaVersion = "0.2.6",
        EventType = "IDS",
        EventResult = StandardizedResult,
        DvcOriginalAction = DvcAction,
        EventCount = 1,
        IpAddr=SrcIpAddr,
        Src=SrcIpAddr,
        Dst=DstIpAddr,
        DvcAction = StandardisedAction,
        DvcInterface = DvcInboundInterface,
        EventSeverity = Severity,
        DvcIpAddr = host,
        Dvc = host
    | where 
	(array_length(dvcaction) == 0 or DvcAction in (dvcaction))
    	and (eventresult == "*" or eventresult==EventResult) 
};
let firewall =() { 
    Ubiquiti_CL 
    | where not(disabled) // required for ASIM parser
    | where
       (isnull(starttime) or TimeGenerated >= starttime)
        and  (isnull(endtime) or TimeGenerated <= endtime)
    | where message matches regex @'\[.+\]\sDESCR="+[A-z\s]+"\sIN='
    | extend  // Parse Message
        EventTime = extract(@'\<\d+\>(\w+\s+\w+\s+\d+:\d+:\d+)\s\w+,', 1, message),
        DvcType = iif(extract(@'\d+\:\d+\:\d+\s(\w+),[A-Fa-f0-9]{12}', 1, message)!="", extract(@'\d+\:\d+\:\d+\s(\w+),[A-Fa-f0-9]{12}', 1, message), extract(@'\d+\:\d+\:\d+\s[A-Fa-f0-9]{12},([A-Za-z-]+)-', 1, message)),
        DvcMacAddr = replace(@'(:)$', @'', replace(@'(\w{2})', @'\1:', extract(@'([A-Fa-f0-9]{12}),' , 1, message))),
        FirmwareVersion = iif(extract(@'[A-Fa-f0-9]{12},v(.*?)\:', 1, message)!="", extract(@'[A-Fa-f0-9]{12},v(.*?)\:', 1, message), extract(@'[A-Fa-f0-9]{12},[A-Za-z-]+([\d\.\+]+)[\:\s]', 1, message)),
        DvcIpAddr = host,
        EventCategory = 'firewall',
        FlowId = extract(@'ID=(.*?)\s', 1, message),
        DvcInboundInterface = extract(@'IN=(.*?)\s', 1, message),
        DvcOutboundInterface = extract(@'OUT=(.*?)\s', 1, message),
        dvc_action = extract(@'\[.+(-(RET|D|R)-).+\]', 2, message),
        NetworkRuleName = extract(@'kernel:\s+\[(\S+)-\w\]', 1, message),
        DstMacAddr = extract(@'MAC=([a-fA-F0-9:]{17}):', 1, message),
        SrcMacAddr = extract(@'MAC=[a-fA-F0-9:]{17}:([a-fA-F0-9:]{17})\s', 1, message),
        SrcIpAddr = extract(@'SRC=(.*?)\s', 1, message),
        SrcPortNumber = extract(@'SPT=(.*?)\s', 1, message),
        DstIpAddr = extract(@'DST=(.*?)\s', 1, message),
        DstPortNumber = extract(@'DPT=(.*?)\s', 1, message),
        NetworkBytes = extract(@'LEN=(.*?)\s', 1, message),
        Tos = extract(@'TOS=(.*?)\s', 1, message),
        Prec = extract(@'PREC=(.*?)\s', 1, message),
        Ttl = extract(@'TTL=(.*?)\s', 1, message),
        NetworkProtocol = extract(@'PROTO=(.*?)\s', 1, message),
        Window = extract(@'WINDOW=(.*?)\s', 1, message),
        Res = extract(@'RES=(.*?)\s', 1, message),
        Mark = extract(@'MARK=(.*?)\s', 1, message)
    | where (isnull(dstportnumber) or DstPortNumber==dstportnumber)
    | extend temp_SrcMatch=has_any_ipv4_prefix(SrcIpAddr,src_or_any)
         , temp_DstMatch=has_any_ipv4_prefix(DstIpAddr,dst_or_any)
    | extend ASimMatchingIpAddr=case(
         array_length(src_or_any) == 0 and array_length(dst_or_any) == 0 ,"-",
         temp_SrcMatch and temp_DstMatch, "Both",
         temp_SrcMatch, "SrcIpAddr",
         temp_DstMatch, "DstIpAddr",
         "No match"
       )
    | where ASimMatchingIpAddr != "No match" 
    | project-away temp_*
    | extend DvcAction = case(dvc_action == "RET", "Accepted",
                            dvc_action == "D", "Denied",
                            dvc_action == "R", "Rejected",
                            "Other")
    | lookup ActionTranslation on DvcAction  // lookup for Schema defined Action Names
    | extend // Build required and optional Schema fields
        EventStartTime = TimeGenerated,
        EventEndTime = TimeGenerated,
        EventSchema = "NetworkSession",
        EventVendor = "Ubiquiti",
        EventSchemaVersion = "0.2.6",
        EventType = "NetworkSession",
        EventResult = StandardizedResult,
        DvcOriginalAction = DvcAction,
        EventCount = 1,
        DstNatIpAddr = iff(message has "DNAT",DstIpAddr,""), // If NAT Forwarding set additional field
        DstNatPortNumber = iff(message has "DNAT",DstPortNumber,""), // If NAT Forwarding set additional field
        SrcNatIpAddr = iff(message has "DNAT",SrcIpAddr,""), // If NAT Forwarding set additional field
        SrcNatPortNumber = iff(message has "DNAT",SrcPortNumber,""), // If NAT Forwarding set additional field
        IpAddr=SrcIpAddr,
        Src=SrcIpAddr,
        Dst=DstIpAddr,
        DvcAction = StandardisedAction,
        DvcInterface = DvcInboundInterface,
        EventSeverity = Severity,
        Dvc = DvcIpAddr
    | where 
	(array_length(dvcaction) == 0 or DvcAction in (dvcaction))
    	and (eventresult == "*" or eventresult==EventResult) 
};
union isfuzzy=true  SuricataAlerts, firewall
};
parser (starttime=starttime, endtime=endtime, srcipaddr_has_any_prefix=srcipaddr_has_any_prefix, dstipaddr_has_any_prefix=dstipaddr_has_any_prefix, ipaddr_has_any_prefix=ipaddr_has_any_prefix, dstportnumber=dstportnumber, hostname_has_any=hostname_has_any, dvcaction=dvcaction, eventresult=eventresult, disabled=disabled)

When saving the function you need to define the parameters correctly as it can be seen in this screenshot:

Then we can create the parameter less version of the parser ASim_NetworkSession_UbiquitiDmPro01:

let parser=(disabled:bool=false) 
{
let ActionTranslation=datatable(DvcAction:string,StandardisedAction:string,StandardizedResult:string,Severity:string)[
      'Accepted','Allow','Success','Informational',
      'Denied','Deny','Failure','Low',
      'Other','Allow','Success','Informational'
   ];
let SuricataAlerts =() { 
    Ubiquiti_CL 
    | where not(disabled) // required for ASIM parser
    | where facility == 21 // filter for Suricata events
    | extend MessageJson= trim_start(@"DreamMachinePro suricata\[[0-9]*\]: ",message)
    | extend // Parse Message
        SrcMacAddr = tostring(parse_json(MessageJson).src_mac),
        SrcIpAddr = tostring(parse_json(MessageJson).src_ip),
        SrcPortNumber = tostring(parse_json(MessageJson).src_port),
        DstIpAddr = tostring(parse_json(MessageJson).dest_ip),
        DstMacAddr = tostring(parse_json(MessageJson).dest_mac),
        DstPortNumber = tostring(parse_json(MessageJson).dest_port),
        FlowId = tostring(parse_json(MessageJson).flow_id),
        NetworkProtocol = tostring(parse_json(MessageJson).proto),
        DvcInboundInterface = tostring(parse_json(MessageJson).in_iface),
        EventCategory = tostring(parse_json(MessageJson).event_type),
        Service  = tostring(parse_json(MessageJson).app_protocol),
        DvcAction  = tostring(parse_json(MessageJson).alert.action),
        AlertMetadata = tostring(parse_json(MessageJson).alert.metadata),
        AlertSignature = tostring(parse_json(MessageJson).alert.signature),        
        AlertCategory = tostring(parse_json(MessageJson).alert.category)   
    | lookup ActionTranslation on DvcAction // lookup for Schema defined Action Names
    | extend // Build required and optional Schema fields
        EventStartTime = TimeGenerated,
        EventEndTime = TimeGenerated,
        EventSchema = "NetworkSession",
        EventVendor = "Ubiquiti",
        EventSchemaVersion = "0.2.6",
        EventType = "IDS",
        EventResult = StandardizedResult,
        DvcOriginalAction = DvcAction,
        EventCount = 1,
        IpAddr=SrcIpAddr,
        Src=SrcIpAddr,
        Dst=DstIpAddr,
        DvcAction = StandardisedAction,
        DvcInterface = DvcInboundInterface,
        EventSeverity = Severity,
        DvcIpAddr = host,
        Dvc = host
};
let firewall =() { 
    Ubiquiti_CL 
    | where not(disabled) // required for ASIM parser
    | where message matches regex @'\[.+\]\sDESCR="+[A-z\s]+"\sIN='
    | extend  // Parse Message
        EventTime = extract(@'\<\d+\>(\w+\s+\w+\s+\d+:\d+:\d+)\s\w+,', 1, message),
        DvcType = iif(extract(@'\d+\:\d+\:\d+\s(\w+),[A-Fa-f0-9]{12}', 1, message)!="", extract(@'\d+\:\d+\:\d+\s(\w+),[A-Fa-f0-9]{12}', 1, message), extract(@'\d+\:\d+\:\d+\s[A-Fa-f0-9]{12},([A-Za-z-]+)-', 1, message)),
        DvcMacAddr = replace(@'(:)$', @'', replace(@'(\w{2})', @'\1:', extract(@'([A-Fa-f0-9]{12}),' , 1, message))),
        FirmwareVersion = iif(extract(@'[A-Fa-f0-9]{12},v(.*?)\:', 1, message)!="", extract(@'[A-Fa-f0-9]{12},v(.*?)\:', 1, message), extract(@'[A-Fa-f0-9]{12},[A-Za-z-]+([\d\.\+]+)[\:\s]', 1, message)),
        DvcIpAddr = host,
        EventCategory = 'firewall',
        FlowId = extract(@'ID=(.*?)\s', 1, message),
        DvcInboundInterface = extract(@'IN=(.*?)\s', 1, message),
        DvcOutboundInterface = extract(@'OUT=(.*?)\s', 1, message),
        dvc_action = extract(@'\[.+(-(RET|D|R)-).+\]', 2, message),
        NetworkRuleName = extract(@'kernel:\s+\[(\S+)-\w\]', 1, message),
        DstMacAddr = extract(@'MAC=([a-fA-F0-9:]{17}):', 1, message),
        SrcMacAddr = extract(@'MAC=[a-fA-F0-9:]{17}:([a-fA-F0-9:]{17})\s', 1, message),
        SrcIpAddr = extract(@'SRC=(.*?)\s', 1, message),
        SrcPortNumber = extract(@'SPT=(.*?)\s', 1, message),
        DstIpAddr = extract(@'DST=(.*?)\s', 1, message),
        DstPortNumber = extract(@'DPT=(.*?)\s', 1, message),
        NetworkBytes = extract(@'LEN=(.*?)\s', 1, message),
        Tos = extract(@'TOS=(.*?)\s', 1, message),
        Prec = extract(@'PREC=(.*?)\s', 1, message),
        Ttl = extract(@'TTL=(.*?)\s', 1, message),
        NetworkProtocol = extract(@'PROTO=(.*?)\s', 1, message),
        Window = extract(@'WINDOW=(.*?)\s', 1, message),
        Res = extract(@'RES=(.*?)\s', 1, message),
        Mark = extract(@'MARK=(.*?)\s', 1, message)
    | extend DvcAction = case(dvc_action == "RET", "Accepted",
                            dvc_action == "D", "Denied",
                            dvc_action == "R", "Rejected",
                            "Other")
    | lookup ActionTranslation on DvcAction  // lookup for Schema defined Action Names
    | extend // Build required and optional Schema fields
        EventStartTime = TimeGenerated,
        EventEndTime = TimeGenerated,
        EventSchema = "NetworkSession",
        EventVendor = "Ubiquiti",
        EventSchemaVersion = "0.2.6",
        EventType = "NetworkSession",
        EventResult = StandardizedResult,
        DvcOriginalAction = DvcAction,
        EventCount = 1,
        DstNatIpAddr = iff(message has "DNAT",DstIpAddr,""), // If NAT Forwarding set additional field
        DstNatPortNumber = iff(message has "DNAT",DstPortNumber,""), // If NAT Forwarding set additional field
        SrcNatIpAddr = iff(message has "DNAT",SrcIpAddr,""), // If NAT Forwarding set additional field
        SrcNatPortNumber = iff(message has "DNAT",SrcPortNumber,""), // If NAT Forwarding set additional field
        IpAddr=SrcIpAddr,
        Src=SrcIpAddr,
        Dst=DstIpAddr,
        DvcAction = StandardisedAction,
        DvcInterface = DvcInboundInterface,
        EventSeverity = Severity,
        Dvc = DvcIpAddr};
union isfuzzy=true  SuricataAlerts, firewall
};
parser (disabled)

Integrate source-specific to unifying parsers

First you need to check if you already have the Im_NetworkSessionCustom and ASim_NetworkSessionCustom. You would see it as in the next screenshot. If not you can deploy them from the Microsoft Sentinel Github Repo or with the following button:

Deploy to Azure

First open the ASim_NetworkSessionCustom according to the steps you can see in the next screenshot.

Adjust the function to include your parser and then save it.

Now open the Im_NetworkSessionCustom function like the parameter less and add your new parser as well. In this file you need also to pass all parameters to the parser.

union Im_NetworkSession_UbiquitiDmPro01 (starttime, endtime, srcipaddr_has_any_prefix, dstipaddr_has_any_prefix, ipaddr_has_any_prefix, dstportnumber, hostname_has_any, dvcaction, eventresult, disabled)

and the Result is 🙂

Now, it’s time to check if the parser works as expected. I just try to get all RDP connections in my environment by leveraging the following query:

_Im_NetworkSession(dstportnumber=3389) 
| summarize count() by EventVendor

In summary, the Microsoft Sentinel ASIM Parser simplifies the complex task of data normalization and enrichment within the cybersecurity landscape. By leveraging standardized parsing functions and facilitating customization, it enables organizations to harness the full potential of their security data.

I hope this blog helps you to understand how ASIM works and you can create your own parsers.

Follow me

0 Comments

Leave a Reply

Avatar placeholder

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.