Skip to content

ScanEmail

Extracts and analyzes metadata, attachments, and generates thumbnails from email messages.

This scanner processes email files to extract and analyze metadata and attachments. It supports both plain text and HTML emails, including inline images.

Scanner Type: Collection

Detection Use Cases

Detection Use Cases

  • Document Extraction
    • Extracts and analyzes documents, including attachments, from email messages for content review.
  • Email Header Analysis
    • Analyzes email headers for potential indicators of malicious activity, such as suspicious sender addresses or subject lines.

Known Limitations

Known Limitations

  • Email Encoding and Complex Structures
    • Limited support for certain email encodings or complex email structures.
  • Limited Output
    • Content is limited to a set amount of characters to prevent excessive output.

To Do

To Do

  • Improve Error Handling:
    • Enhance error handling for edge cases and complex email structures.
  • Enhance Support for Additional Email Encodings and Content Types:
    • Expand support for various email encodings and content types to improve scanning accuracy.

References

Contributors

Source code in strelka/src/python/strelka/scanners/scan_email.py
class ScanEmail(strelka.Scanner):
    """
    Extracts and analyzes metadata, attachments, and generates thumbnails from email messages.

    This scanner processes email files to extract and analyze metadata and attachments.
    It supports both plain text and HTML emails, including inline images.

    Scanner Type: Collection

    Attributes:
        None

    ## Detection Use Cases
    !!! info "Detection Use Cases"
        - **Document Extraction**
            - Extracts and analyzes documents, including attachments, from email messages for content review.
        - **Email Header Analysis**
            - Analyzes email headers for potential indicators of malicious activity, such as suspicious sender addresses
            or subject lines.

    ## Known Limitations
    !!! warning "Known Limitations"
        - **Email Encoding and Complex Structures**
            - Limited support for certain email encodings or complex email structures.
        - **Limited Output**
            - Content is limited to a set amount of characters to prevent excessive output.

    ## To Do
    !!! question "To Do"
        - **Improve Error Handling**:
            - Enhance error handling for edge cases and complex email structures.
        - **Enhance Support for Additional Email Encodings and Content Types**:
            - Expand support for various email encodings and content types to improve scanning accuracy.

    ## References
    !!! quote "References"
        - [Python Email Parsing Documentation](https://docs.python.org/3/library/email.html)
        - [PyMuPDF (fitz) Documentation](https://pymupdf.readthedocs.io/en/latest/)

    ## Contributors
    !!! example "Contributors"
        - [Josh Liburdi](https://github.com/jshlbrd)
        - [Paul Hutelmyer](https://github.com/phutelmyer)
        - [Ryan O'Horo](https://github.com/ryanohoro)

    """

    def scan(
        self,
        data: bytes,
        file: strelka.File,
        options: dict,
        expire_at: int,
    ) -> None:
        """
        Processes the email, extracts metadata, and attachments.

        Args:
            data (bytes): The raw email data.
            file (strelka.File): File details.
            options (dict): Scanner options.
            expire_at (int): Expiry time of the scan.

        Processes the email to extract metadata and attachments.
        """

        # Initialize data structures for storing scan results
        attachments = []
        self.event["total"] = {"attachments": 0, "extracted": 0}

        # Parse email contents
        try:
            # Open and parse email byte string
            ep = eml_parser.EmlParser(
                include_attachment_data=True, include_raw_body=True
            )
            parsed_eml = ep.decode_email_bytes(data)

            # Check if email was parsed properly and attempt to deconflict and reload.
            if not (parsed_eml["header"]["subject"] and parsed_eml["header"]["header"]):
                if b"\nReceived: from " in data:
                    data = (
                        data.rpartition(b"\nReceived: from ")[1]
                        + data.rpartition(b"\nReceived: from ")[2]
                    )[1:]
                elif b"Start mail input; end with <CRLF>.<CRLF>\n" in data:
                    data = data.rpartition(
                        b"Start mail input; end with <CRLF>.<CRLF>\n"
                    )[2]
                parsed_eml = ep.decode_email_bytes(data)

            # Extract body content and domains
            if "body" in parsed_eml:
                for body in parsed_eml["body"]:
                    if "content_type" in body:
                        if body["content_type"] == "text/plain":
                            if len(body["content"]) <= 200:
                                self.event["body"] = body["content"]
                            else:
                                self.event["body"] = (
                                    body["content"][:100]
                                    + "..."
                                    + body["content"][-100:]
                                )
                    else:
                        self.event["body"] = (
                            body["content"][:100] + "..." + body["content"][-100:]
                        )
                    if "domain" in body:
                        if "domain" in self.event:
                            self.event["domains"] += body["domain"]
                        else:
                            self.event["domains"] = body["domain"]

            # Extract attachment details and raw data
            if "attachment" in parsed_eml:
                self.event["attachments"] = {
                    "filenames": [],
                    "hashes": [],
                    "totalsize": 0,
                }
                for attachment in parsed_eml["attachment"]:
                    self.event["attachments"]["filenames"].append(
                        attachment["filename"]
                    )
                    self.event["attachments"]["hashes"].append(
                        attachment["hash"]["md5"]
                    )
                    self.event["attachments"]["totalsize"] += attachment["size"]
                    attachments.append(
                        {
                            "name": attachment["filename"],
                            "content-type": attachment["content_header"][
                                "content-type"
                            ][0],
                            "raw": base64.b64decode(attachment["raw"]),
                        }
                    )

            # Extract email header information
            self.event["subject"] = parsed_eml["header"].get("subject", "")
            self.event["to"] = parsed_eml["header"].get("to", "")
            self.event["from"] = parsed_eml["header"].get("from", "")
            date_header = parsed_eml["header"].get("date")
            if date_header:
                self.event["date_utc"] = (
                    date_header.astimezone(pytz.utc).isoformat()[:-6] + ".000Z"
                )
            header = parsed_eml.get("header", {}).get("header", {})
            message_id = header.get("message-id", [])[0] if header else None
            self.event["message_id"] = (
                str(message_id.lstrip("<").rstrip(">")) if message_id else ""
            )
            self.event["received_domain"] = parsed_eml["header"].get(
                "received_domain", []
            )
            self.event["received_ip"] = parsed_eml["header"].get("received_ip", [])

            # Process attachments
            if attachments:
                for attachment in attachments:
                    self.event["total"]["attachments"] += 1
                    name = attachment["name"]
                    try:
                        flavors = [
                            attachment["content-type"]
                            .encode("utf-8")
                            .partition(b";")[0]
                        ]
                    except Exception as e:
                        self.flags.append(
                            f"{self.__class__.__name__}: email_extract_attachment_error: {str(e)[:50]}"
                        )
                    # Send extracted file back to Strelka
                    self.emit_file(attachment["raw"], name=name, flavors=flavors)
                    self.event["total"]["extracted"] += 1

        except Exception as e:
            self.flags.append(
                f"{self.__class__.__name__}: email_parse_error: {str(e)[:50]}"
            )

    @staticmethod
    def decode_and_format_header(msg: email.message.Message, header_name: str) -> str:
        """
        Decodes and safely formats a specific header field from an email message.

        Email headers can be encoded in various formats. This function decodes the header
        into a human-readable format, and also ensures that the text is safe for HTML display.

        Args:
            msg (email.message.Message): Parsed email message object.
            header_name (str): The name of the header field to decode.

        Returns:
            A string representing the decoded and formatted header field values.
            Returns a placeholder string if the header field is missing or cannot be decoded.

        """
        try:
            # Decode the specified header field
            decoded_header = email.header.decode_header(msg[header_name])[0]
            # Convert bytes to string if necessary
            field_value = decoded_header[0]
            if isinstance(field_value, bytes):
                field_value = field_value.decode(decoded_header[1] or "utf-8")
        except Exception:
            field_value = "&lt;Unknown&gt;"

        # Replace angle brackets for HTML safety
        return field_value.replace("<", "&lt;").replace(">", "&gt;")

scan(data, file, options, expire_at)

Processes the email, extracts metadata, and attachments.

Parameters:

Name Type Description Default
data bytes

The raw email data.

required
file File

File details.

required
options dict

Scanner options.

required
expire_at int

Expiry time of the scan.

required

Processes the email to extract metadata and attachments.

Source code in strelka/src/python/strelka/scanners/scan_email.py
def scan(
    self,
    data: bytes,
    file: strelka.File,
    options: dict,
    expire_at: int,
) -> None:
    """
    Processes the email, extracts metadata, and attachments.

    Args:
        data (bytes): The raw email data.
        file (strelka.File): File details.
        options (dict): Scanner options.
        expire_at (int): Expiry time of the scan.

    Processes the email to extract metadata and attachments.
    """

    # Initialize data structures for storing scan results
    attachments = []
    self.event["total"] = {"attachments": 0, "extracted": 0}

    # Parse email contents
    try:
        # Open and parse email byte string
        ep = eml_parser.EmlParser(
            include_attachment_data=True, include_raw_body=True
        )
        parsed_eml = ep.decode_email_bytes(data)

        # Check if email was parsed properly and attempt to deconflict and reload.
        if not (parsed_eml["header"]["subject"] and parsed_eml["header"]["header"]):
            if b"\nReceived: from " in data:
                data = (
                    data.rpartition(b"\nReceived: from ")[1]
                    + data.rpartition(b"\nReceived: from ")[2]
                )[1:]
            elif b"Start mail input; end with <CRLF>.<CRLF>\n" in data:
                data = data.rpartition(
                    b"Start mail input; end with <CRLF>.<CRLF>\n"
                )[2]
            parsed_eml = ep.decode_email_bytes(data)

        # Extract body content and domains
        if "body" in parsed_eml:
            for body in parsed_eml["body"]:
                if "content_type" in body:
                    if body["content_type"] == "text/plain":
                        if len(body["content"]) <= 200:
                            self.event["body"] = body["content"]
                        else:
                            self.event["body"] = (
                                body["content"][:100]
                                + "..."
                                + body["content"][-100:]
                            )
                else:
                    self.event["body"] = (
                        body["content"][:100] + "..." + body["content"][-100:]
                    )
                if "domain" in body:
                    if "domain" in self.event:
                        self.event["domains"] += body["domain"]
                    else:
                        self.event["domains"] = body["domain"]

        # Extract attachment details and raw data
        if "attachment" in parsed_eml:
            self.event["attachments"] = {
                "filenames": [],
                "hashes": [],
                "totalsize": 0,
            }
            for attachment in parsed_eml["attachment"]:
                self.event["attachments"]["filenames"].append(
                    attachment["filename"]
                )
                self.event["attachments"]["hashes"].append(
                    attachment["hash"]["md5"]
                )
                self.event["attachments"]["totalsize"] += attachment["size"]
                attachments.append(
                    {
                        "name": attachment["filename"],
                        "content-type": attachment["content_header"][
                            "content-type"
                        ][0],
                        "raw": base64.b64decode(attachment["raw"]),
                    }
                )

        # Extract email header information
        self.event["subject"] = parsed_eml["header"].get("subject", "")
        self.event["to"] = parsed_eml["header"].get("to", "")
        self.event["from"] = parsed_eml["header"].get("from", "")
        date_header = parsed_eml["header"].get("date")
        if date_header:
            self.event["date_utc"] = (
                date_header.astimezone(pytz.utc).isoformat()[:-6] + ".000Z"
            )
        header = parsed_eml.get("header", {}).get("header", {})
        message_id = header.get("message-id", [])[0] if header else None
        self.event["message_id"] = (
            str(message_id.lstrip("<").rstrip(">")) if message_id else ""
        )
        self.event["received_domain"] = parsed_eml["header"].get(
            "received_domain", []
        )
        self.event["received_ip"] = parsed_eml["header"].get("received_ip", [])

        # Process attachments
        if attachments:
            for attachment in attachments:
                self.event["total"]["attachments"] += 1
                name = attachment["name"]
                try:
                    flavors = [
                        attachment["content-type"]
                        .encode("utf-8")
                        .partition(b";")[0]
                    ]
                except Exception as e:
                    self.flags.append(
                        f"{self.__class__.__name__}: email_extract_attachment_error: {str(e)[:50]}"
                    )
                # Send extracted file back to Strelka
                self.emit_file(attachment["raw"], name=name, flavors=flavors)
                self.event["total"]["extracted"] += 1

    except Exception as e:
        self.flags.append(
            f"{self.__class__.__name__}: email_parse_error: {str(e)[:50]}"
        )

decode_and_format_header(msg, header_name) staticmethod

Decodes and safely formats a specific header field from an email message.

Email headers can be encoded in various formats. This function decodes the header into a human-readable format, and also ensures that the text is safe for HTML display.

Parameters:

Name Type Description Default
msg Message

Parsed email message object.

required
header_name str

The name of the header field to decode.

required

Returns:

Type Description
str

A string representing the decoded and formatted header field values.

str

Returns a placeholder string if the header field is missing or cannot be decoded.

Source code in strelka/src/python/strelka/scanners/scan_email.py
@staticmethod
def decode_and_format_header(msg: email.message.Message, header_name: str) -> str:
    """
    Decodes and safely formats a specific header field from an email message.

    Email headers can be encoded in various formats. This function decodes the header
    into a human-readable format, and also ensures that the text is safe for HTML display.

    Args:
        msg (email.message.Message): Parsed email message object.
        header_name (str): The name of the header field to decode.

    Returns:
        A string representing the decoded and formatted header field values.
        Returns a placeholder string if the header field is missing or cannot be decoded.

    """
    try:
        # Decode the specified header field
        decoded_header = email.header.decode_header(msg[header_name])[0]
        # Convert bytes to string if necessary
        field_value = decoded_header[0]
        if isinstance(field_value, bytes):
            field_value = field_value.decode(decoded_header[1] or "utf-8")
    except Exception:
        field_value = "&lt;Unknown&gt;"

    # Replace angle brackets for HTML safety
    return field_value.replace("<", "&lt;").replace(">", "&gt;")

Features

The features of this scanner are detailed below. These features represent the capabilities and the type of analysis the scanner can perform. This may include support for Indicators of Compromise (IOC), the ability to emit files for further analysis, and the presence of extended documentation for complex analysis techniques.

Feature
Support
IOC Support
Emit Files
Extended Docs
Malware Scanner
Image Thumbnails

Tastes

Strelka's file distribution system assigns scanners to files based on 'flavors' and 'tastes'. Flavors describe the type of file, typically determined by MIME types from libmagic, matches from YARA rules, or characteristics of parent files. Tastes are the criteria used within Strelka to determine which scanners are applied to which files, with positive and negative tastes defining files to be included or excluded respectively.

Source Filetype
Include / Exclude
application/vnd.ms-outlook
email_file_broad
email_file
message/rfc822

Scanner Fields

This section provides a list of fields that are extracted from the files processed by this scanner. These fields include the data elements that the scanner extracts from each file, representing the analytical results produced by the scanner. If the test file is missing or cannot be parsed, this section will not contain any data.

Field Name
Field Type
attachments
dict
attachments.filenames
list
attachments.hashes
str
attachments.totalsize
int
body
str
date_utc
str
domains
str
domains
list
elapsed
str
flags
list
from
str
message_id
str
received_domain
str
received_domain
list
received_ip
str
received_ip
list
subject
str
to
str
to
list
total
dict
total.attachments
int
total.extracted
int

Sample Event

Below is a sample event generated by this scanner, demonstrating the kind of output that can be expected when it processes a file. This sample is derived from a mock scan event configured in the scanner's test file. If no test file is available, this section will not display a sample event.

    test_scan_event = {
        "elapsed": 0.001,
        "flags": [],
        "total": {"attachments": 2, "extracted": 2},
        "body": "Lorem Ipsum\n\n[cid:image001.jpg@01D914BA.2B9507C0]\n\n\nLorem ipsum dolor sit amet, consectetur adipisci...tristique mi, quis finibus justo augue non ligula. Quisque facilisis dui in orci aliquet fermentum.\n",
        "domains": unordered(
            [
                "schemas.microsoft.com",
                "www.w3.org",
                "div.msonormal",
                "span.msohyperlink",
                "span.msohyperlinkfollowed",
                "span.emailstyle17",
                "1.0in",
                "div.wordsection1",
            ]
        ),
        "attachments": {
            "filenames": ["image001.jpg", "test.doc"],
            "hashes": unordered(
                [
                    "ee97b5bb7816b8ad3c3b4024a5d7ff06",
                    "33a13c0806ec35806889a93a5f259c7a",
                ]
            ),
            "totalsize": 72819,
        },
        "subject": "Lorem Ipsum",
        "to": unordered(["baz.quk@example.com"]),
        "from": "foo.bar@example.com",
        "date_utc": "2022-12-21T02:29:49.000Z",
        "message_id": "DS7PR03MB5640AD212589DFB7CE58D90CFBEB9@DS7PR03MB5640.namprd03.prod.outlook.com",
        "received_domain": unordered(
            [
                "ch2pr03mb5366.namprd03.prod.outlook.com",
                "mx0b-0020ab02.pphosted.com",
                "pps.filterd",
                "mx.example.com",
                "ds7pr03mb5640.namprd03.prod.outlook.com",
                "mx0a-0020ab02.pphosted.com",
            ]
        ),
        "received_ip": unordered(
            [
                "022.12.20.18",
                "fe80::bd8e:df17:2c2f:2490",
                "8.17.1.19",
                "2603:10b6:5:2c0::11",
                "205.220.177.243",
                "2603:10b6:610:96::16",
                "127.0.0.1",
                "2002:a05:6500:11d0:b0:17b:2a20:6c32",
            ]
        ),
    }