Skip to content

ScanUdf

Extracts files from UDF images

Source code in strelka/src/python/strelka/scanners/scan_udf.py
class ScanUdf(strelka.Scanner):
    """Extracts files from UDF images"""

    EXCLUDED_ROOT_DIRS = ["[SYSTEM]"]

    def scan(self, data, file, options, expire_at):
        file_limit = options.get("limit", 100)
        tmp_directory = options.get("tmp_file_directory", "/tmp/")
        scanner_timeout = options.get("scanner_timeout", 150)

        self.event["total"] = {"files": 0, "extracted": 0}
        self.event["files"] = []
        self.event["hidden_dirs"] = []
        self.event["meta"] = {}

        try:
            self.extract_7zip(
                data, tmp_directory, scanner_timeout, expire_at, file_limit
            )

        except strelka.ScannerTimeout:
            raise
        except Exception:
            self.flags.append("vhd_7zip_extract_error")

    def extract_7zip(self, data, tmp_dir, scanner_timeout, expire_at, file_limit):
        """Decompress input file to /tmp with 7zz, send files to coordinator"""

        # Check if 7zip package is installed
        if not shutil.which("7zz"):
            self.flags.append("vhd_7zip_not_installed_error")
            return

        with tempfile.NamedTemporaryFile(dir=tmp_dir, mode="wb") as tmp_data:
            tmp_data.write(data)
            tmp_data.flush()
            tmp_data.seek(0)

            if not tmp_data:
                self.flags.append("vhd_7zip_tmp_error")
                return

            try:
                with tempfile.TemporaryDirectory() as tmp_extract:
                    try:
                        (stdout, stderr) = subprocess.Popen(
                            ["7zz", "x", tmp_data.name, f"-o{tmp_extract}"],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.DEVNULL,
                        ).communicate(timeout=scanner_timeout)
                    except strelka.ScannerTimeout:
                        raise
                    except Exception:
                        self.flags.append("vhd_7zip_extract_process_error")

                    def get_all_items(root, exclude=None):
                        """Iterates through filesystem paths"""
                        if exclude is None:
                            exclude = []
                        for item in root.iterdir():
                            if item.name in exclude:
                                continue
                            yield item
                            if item.is_dir():
                                yield from get_all_items(item)

                    # Iterate over extracted files, except excluded paths
                    for name in get_all_items(
                        pathlib.Path(tmp_extract), self.EXCLUDED_ROOT_DIRS
                    ):
                        if not name.is_file():
                            continue

                        if self.event["total"]["extracted"] >= file_limit:
                            self.flags.append("vhd_file_limit_error")
                            break

                        try:
                            relname = os.path.relpath(name, tmp_extract)
                            with open(name, "rb") as extracted_file:
                                # Send extracted file back to Strelka
                                self.emit_file(extracted_file.read(), name=relname)

                            self.event["total"]["extracted"] += 1
                        except strelka.ScannerTimeout:
                            raise
                        except Exception:
                            self.flags.append("vhd_file_upload_error")

            except strelka.ScannerTimeout:
                raise
            except Exception:
                self.flags.append("vhd_7zip_extract_error")

            try:
                (stdout, stderr) = subprocess.Popen(
                    ["7zz", "l", tmp_data.name],
                    stdout=subprocess.PIPE,
                    stderr=subprocess.DEVNULL,
                ).communicate(timeout=scanner_timeout)

                self.parse_7zip_stdout(stdout.decode("utf-8"), file_limit)

            except strelka.ScannerTimeout:
                raise
            except Exception:
                self.flags.append("vhd_7zip_output_error")
                return

    def parse_7zip_stdout(self, output_7zip, file_limit):
        """Parse 7zz output, create metadata"""

        mode = None

        try:
            output_lines = output_7zip.splitlines()

            # 7-Zip (z) 24.09 (x64) : Copyright (c) 1999-2021 Igor Pavlov : 2021-12-26
            regex_7zip_version = re.compile(r"^7-Zip[^\d]+(\d+\.\d+)")

            # --/----
            regex_mode_properties = re.compile(r"^(--|----)$")

            # Comment =
            regex_property = re.compile(r"^(.+) = (.+)$")

            #    Date      Time    Attr         Size   Compressed  Name
            regex_mode_files = re.compile(
                r"\s+Date\s+Time\s+Attr\s+Size\s+Compressed\s+Name"
            )

            # 2022-12-05 17:23:59 ....A       100656       102400  lorem.txt
            regex_file = re.compile(
                r"(?P<datetime>\d+-\d+-\d+\s\d+:\d+:\d+)\s+(?P<modes>[A-Z.]{5})(?:\s+(?P<size>\d+))?(?:\s+(?P<compressed>\d+))?\s+(?P<name>.+)"
            )

            def parse_file_modes(file_modes):
                file_mode_list = []

                for file_mode in file_modes:
                    if file_mode == "D":
                        file_mode_list.append("directory")
                    elif file_mode == "R":
                        file_mode_list.append("readonly")
                    elif file_mode == "H":
                        file_mode_list.append("hidden")
                    elif file_mode == "S":
                        file_mode_list.append("system")
                    elif file_mode == "A":
                        file_mode_list.append("archivable")

                return file_mode_list

            partition = {}

            for output_line in output_lines:
                if output_line:
                    # Properties section
                    match = regex_mode_properties.match(output_line)
                    if match:
                        if "path" in partition.keys():
                            if not self.event.get("meta", {}).get("partitions", []):
                                self.event["meta"]["partitions"] = []
                            self.event["meta"]["partitions"].append(partition)
                        partition = {}
                        mode = "properties"

                    # File section
                    match = regex_mode_files.match(output_line)
                    if match:
                        # Wrap up final partition
                        if "path" in partition.keys():
                            if not self.event.get("meta", {}).get("partitions", []):
                                self.event["meta"]["partitions"] = []
                            self.event["meta"]["partitions"].append(partition)
                        partition = {}
                        mode = "files"

                    # Header section
                    if not mode:
                        match = regex_7zip_version.match(output_line)
                        if match:
                            version = regex_7zip_version.match(output_line).group(1)
                            self.event["meta"]["7zip_version"] = version

                            continue

                    elif mode == "properties":
                        # Collect specific properties
                        match = regex_property.match(output_line)
                        if match:
                            if match.group(1) == "Label":
                                partition["label"] = match.group(2)
                            elif match.group(1) == "Path":
                                partition["path"] = match.group(2)
                            elif match.group(1) == "Type":
                                partition["type"] = match.group(2)
                            elif match.group(1) == "Created":
                                partition["created"] = match.group(2)
                            elif match.group(1) == "Creator Application":
                                partition["creator_application"] = match.group(2)
                            elif match.group(1) == "File System":
                                partition["file_system"] = match.group(2)

                    elif mode == "files":
                        match = regex_file.match(output_line)
                        if match:
                            modes_list = parse_file_modes(match.group("modes"))

                            # Skip excluded paths
                            if (
                                os.path.normpath(match.group("name")).split(
                                    os.path.sep
                                )[0]
                                in self.EXCLUDED_ROOT_DIRS
                            ):
                                continue

                            # Matching ScanIso, collecting hidden directories separately
                            if "hidden" in modes_list and "directory" in modes_list:
                                self.event["hidden_dirs"].append(match.group("name"))

                            if "directory" not in modes_list:
                                self.event["total"]["files"] += 1
                                self.event["files"].append(
                                    {
                                        "filename": match.group("name"),
                                        "size": match.group("size"),
                                        "datetime": match.group("datetime"),
                                    }
                                )

        except Exception:
            self.flags.append("vhd_7zip_parse_error")
            return

    def upload(self, name, expire_at):
        """Send extracted file to coordinator"""
        with open(name, "rb") as extracted_file:
            # Send extracted file back to Strelka
            self.emit_file(
                extracted_file.read(), name=os.path.basename(extracted_file.name)
            )

extract_7zip(data, tmp_dir, scanner_timeout, expire_at, file_limit)

Decompress input file to /tmp with 7zz, send files to coordinator

Source code in strelka/src/python/strelka/scanners/scan_udf.py
def extract_7zip(self, data, tmp_dir, scanner_timeout, expire_at, file_limit):
    """Decompress input file to /tmp with 7zz, send files to coordinator"""

    # Check if 7zip package is installed
    if not shutil.which("7zz"):
        self.flags.append("vhd_7zip_not_installed_error")
        return

    with tempfile.NamedTemporaryFile(dir=tmp_dir, mode="wb") as tmp_data:
        tmp_data.write(data)
        tmp_data.flush()
        tmp_data.seek(0)

        if not tmp_data:
            self.flags.append("vhd_7zip_tmp_error")
            return

        try:
            with tempfile.TemporaryDirectory() as tmp_extract:
                try:
                    (stdout, stderr) = subprocess.Popen(
                        ["7zz", "x", tmp_data.name, f"-o{tmp_extract}"],
                        stdout=subprocess.PIPE,
                        stderr=subprocess.DEVNULL,
                    ).communicate(timeout=scanner_timeout)
                except strelka.ScannerTimeout:
                    raise
                except Exception:
                    self.flags.append("vhd_7zip_extract_process_error")

                def get_all_items(root, exclude=None):
                    """Iterates through filesystem paths"""
                    if exclude is None:
                        exclude = []
                    for item in root.iterdir():
                        if item.name in exclude:
                            continue
                        yield item
                        if item.is_dir():
                            yield from get_all_items(item)

                # Iterate over extracted files, except excluded paths
                for name in get_all_items(
                    pathlib.Path(tmp_extract), self.EXCLUDED_ROOT_DIRS
                ):
                    if not name.is_file():
                        continue

                    if self.event["total"]["extracted"] >= file_limit:
                        self.flags.append("vhd_file_limit_error")
                        break

                    try:
                        relname = os.path.relpath(name, tmp_extract)
                        with open(name, "rb") as extracted_file:
                            # Send extracted file back to Strelka
                            self.emit_file(extracted_file.read(), name=relname)

                        self.event["total"]["extracted"] += 1
                    except strelka.ScannerTimeout:
                        raise
                    except Exception:
                        self.flags.append("vhd_file_upload_error")

        except strelka.ScannerTimeout:
            raise
        except Exception:
            self.flags.append("vhd_7zip_extract_error")

        try:
            (stdout, stderr) = subprocess.Popen(
                ["7zz", "l", tmp_data.name],
                stdout=subprocess.PIPE,
                stderr=subprocess.DEVNULL,
            ).communicate(timeout=scanner_timeout)

            self.parse_7zip_stdout(stdout.decode("utf-8"), file_limit)

        except strelka.ScannerTimeout:
            raise
        except Exception:
            self.flags.append("vhd_7zip_output_error")
            return

parse_7zip_stdout(output_7zip, file_limit)

Parse 7zz output, create metadata

Source code in strelka/src/python/strelka/scanners/scan_udf.py
def parse_7zip_stdout(self, output_7zip, file_limit):
    """Parse 7zz output, create metadata"""

    mode = None

    try:
        output_lines = output_7zip.splitlines()

        # 7-Zip (z) 24.09 (x64) : Copyright (c) 1999-2021 Igor Pavlov : 2021-12-26
        regex_7zip_version = re.compile(r"^7-Zip[^\d]+(\d+\.\d+)")

        # --/----
        regex_mode_properties = re.compile(r"^(--|----)$")

        # Comment =
        regex_property = re.compile(r"^(.+) = (.+)$")

        #    Date      Time    Attr         Size   Compressed  Name
        regex_mode_files = re.compile(
            r"\s+Date\s+Time\s+Attr\s+Size\s+Compressed\s+Name"
        )

        # 2022-12-05 17:23:59 ....A       100656       102400  lorem.txt
        regex_file = re.compile(
            r"(?P<datetime>\d+-\d+-\d+\s\d+:\d+:\d+)\s+(?P<modes>[A-Z.]{5})(?:\s+(?P<size>\d+))?(?:\s+(?P<compressed>\d+))?\s+(?P<name>.+)"
        )

        def parse_file_modes(file_modes):
            file_mode_list = []

            for file_mode in file_modes:
                if file_mode == "D":
                    file_mode_list.append("directory")
                elif file_mode == "R":
                    file_mode_list.append("readonly")
                elif file_mode == "H":
                    file_mode_list.append("hidden")
                elif file_mode == "S":
                    file_mode_list.append("system")
                elif file_mode == "A":
                    file_mode_list.append("archivable")

            return file_mode_list

        partition = {}

        for output_line in output_lines:
            if output_line:
                # Properties section
                match = regex_mode_properties.match(output_line)
                if match:
                    if "path" in partition.keys():
                        if not self.event.get("meta", {}).get("partitions", []):
                            self.event["meta"]["partitions"] = []
                        self.event["meta"]["partitions"].append(partition)
                    partition = {}
                    mode = "properties"

                # File section
                match = regex_mode_files.match(output_line)
                if match:
                    # Wrap up final partition
                    if "path" in partition.keys():
                        if not self.event.get("meta", {}).get("partitions", []):
                            self.event["meta"]["partitions"] = []
                        self.event["meta"]["partitions"].append(partition)
                    partition = {}
                    mode = "files"

                # Header section
                if not mode:
                    match = regex_7zip_version.match(output_line)
                    if match:
                        version = regex_7zip_version.match(output_line).group(1)
                        self.event["meta"]["7zip_version"] = version

                        continue

                elif mode == "properties":
                    # Collect specific properties
                    match = regex_property.match(output_line)
                    if match:
                        if match.group(1) == "Label":
                            partition["label"] = match.group(2)
                        elif match.group(1) == "Path":
                            partition["path"] = match.group(2)
                        elif match.group(1) == "Type":
                            partition["type"] = match.group(2)
                        elif match.group(1) == "Created":
                            partition["created"] = match.group(2)
                        elif match.group(1) == "Creator Application":
                            partition["creator_application"] = match.group(2)
                        elif match.group(1) == "File System":
                            partition["file_system"] = match.group(2)

                elif mode == "files":
                    match = regex_file.match(output_line)
                    if match:
                        modes_list = parse_file_modes(match.group("modes"))

                        # Skip excluded paths
                        if (
                            os.path.normpath(match.group("name")).split(
                                os.path.sep
                            )[0]
                            in self.EXCLUDED_ROOT_DIRS
                        ):
                            continue

                        # Matching ScanIso, collecting hidden directories separately
                        if "hidden" in modes_list and "directory" in modes_list:
                            self.event["hidden_dirs"].append(match.group("name"))

                        if "directory" not in modes_list:
                            self.event["total"]["files"] += 1
                            self.event["files"].append(
                                {
                                    "filename": match.group("name"),
                                    "size": match.group("size"),
                                    "datetime": match.group("datetime"),
                                }
                            )

    except Exception:
        self.flags.append("vhd_7zip_parse_error")
        return

upload(name, expire_at)

Send extracted file to coordinator

Source code in strelka/src/python/strelka/scanners/scan_udf.py
def upload(self, name, expire_at):
    """Send extracted file to coordinator"""
    with open(name, "rb") as extracted_file:
        # Send extracted file back to Strelka
        self.emit_file(
            extracted_file.read(), name=os.path.basename(extracted_file.name)
        )

Features

The features of this scanner are detailed below. These features represent the capabilities and the type of analysis the scanner can perform. This may include support for Indicators of Compromise (IOC), the ability to emit files for further analysis, and the presence of extended documentation for complex analysis techniques.

Feature
Support
IOC Support
Emit Files
Extended Docs
Malware Scanner
Image Thumbnails

Tastes

Strelka's file distribution system assigns scanners to files based on 'flavors' and 'tastes'. Flavors describe the type of file, typically determined by MIME types from libmagic, matches from YARA rules, or characteristics of parent files. Tastes are the criteria used within Strelka to determine which scanners are applied to which files, with positive and negative tastes defining files to be included or excluded respectively.

Source Filetype
Include / Exclude
udf_file

Scanner Fields

This section provides a list of fields that are extracted from the files processed by this scanner. These fields include the data elements that the scanner extracts from each file, representing the analytical results produced by the scanner. If the test file is missing or cannot be parsed, this section will not contain any data.

Field Name
Field Type
elapsed
str
files
list
files.datetime
str
files.filename
str
files.size
str
flags
list
hidden_dirs
list
meta
dict
meta.7zip_version
str
meta.partitions
list
meta.partitions.created
str
meta.partitions.path
str
meta.partitions.type
str
total
dict
total.extracted
int
total.files
int

Sample Event

Below is a sample event generated by this scanner, demonstrating the kind of output that can be expected when it processes a file. This sample is derived from a mock scan event configured in the scanner's test file. If no test file is available, this section will not display a sample event.

    test_scan_event = {
        "elapsed": 0.001,
        "flags": [],
        "total": {"files": 1, "extracted": 1},
        "files": [
            {
                "filename": "lorem.txt",
                "size": "4015",
                "datetime": "2022-12-12 03:12:55",
            },
        ],
        "hidden_dirs": [],
        "meta": {
            "7zip_version": "24.09",
            "partitions": [
                {
                    "path": 0.001,
                    "type": "Udf",
                    "created": 0.001,
                },
            ],
        },
    }