ScanUdf
Extracts files from UDF images
Source code in strelka/src/python/strelka/scanners/scan_udf.py
| class ScanUdf(strelka.Scanner):
"""Extracts files from UDF images"""
EXCLUDED_ROOT_DIRS = ["[SYSTEM]"]
def scan(self, data, file, options, expire_at):
file_limit = options.get("limit", 100)
tmp_directory = options.get("tmp_file_directory", "/tmp/")
scanner_timeout = options.get("scanner_timeout", 150)
self.event["total"] = {"files": 0, "extracted": 0}
self.event["files"] = []
self.event["hidden_dirs"] = []
self.event["meta"] = {}
try:
self.extract_7zip(
data, tmp_directory, scanner_timeout, expire_at, file_limit
)
except strelka.ScannerTimeout:
raise
except Exception:
self.flags.append("vhd_7zip_extract_error")
def extract_7zip(self, data, tmp_dir, scanner_timeout, expire_at, file_limit):
"""Decompress input file to /tmp with 7zz, send files to coordinator"""
# Check if 7zip package is installed
if not shutil.which("7zz"):
self.flags.append("vhd_7zip_not_installed_error")
return
with tempfile.NamedTemporaryFile(dir=tmp_dir, mode="wb") as tmp_data:
tmp_data.write(data)
tmp_data.flush()
tmp_data.seek(0)
if not tmp_data:
self.flags.append("vhd_7zip_tmp_error")
return
try:
with tempfile.TemporaryDirectory() as tmp_extract:
try:
(stdout, stderr) = subprocess.Popen(
["7zz", "x", tmp_data.name, f"-o{tmp_extract}"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
).communicate(timeout=scanner_timeout)
except strelka.ScannerTimeout:
raise
except Exception:
self.flags.append("vhd_7zip_extract_process_error")
def get_all_items(root, exclude=None):
"""Iterates through filesystem paths"""
if exclude is None:
exclude = []
for item in root.iterdir():
if item.name in exclude:
continue
yield item
if item.is_dir():
yield from get_all_items(item)
# Iterate over extracted files, except excluded paths
for name in get_all_items(
pathlib.Path(tmp_extract), self.EXCLUDED_ROOT_DIRS
):
if not name.is_file():
continue
if self.event["total"]["extracted"] >= file_limit:
self.flags.append("vhd_file_limit_error")
break
try:
relname = os.path.relpath(name, tmp_extract)
with open(name, "rb") as extracted_file:
# Send extracted file back to Strelka
self.emit_file(extracted_file.read(), name=relname)
self.event["total"]["extracted"] += 1
except strelka.ScannerTimeout:
raise
except Exception:
self.flags.append("vhd_file_upload_error")
except strelka.ScannerTimeout:
raise
except Exception:
self.flags.append("vhd_7zip_extract_error")
try:
(stdout, stderr) = subprocess.Popen(
["7zz", "l", tmp_data.name],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
).communicate(timeout=scanner_timeout)
self.parse_7zip_stdout(stdout.decode("utf-8"), file_limit)
except strelka.ScannerTimeout:
raise
except Exception:
self.flags.append("vhd_7zip_output_error")
return
def parse_7zip_stdout(self, output_7zip, file_limit):
"""Parse 7zz output, create metadata"""
mode = None
try:
output_lines = output_7zip.splitlines()
# 7-Zip (z) 24.09 (x64) : Copyright (c) 1999-2021 Igor Pavlov : 2021-12-26
regex_7zip_version = re.compile(r"^7-Zip[^\d]+(\d+\.\d+)")
# --/----
regex_mode_properties = re.compile(r"^(--|----)$")
# Comment =
regex_property = re.compile(r"^(.+) = (.+)$")
# Date Time Attr Size Compressed Name
regex_mode_files = re.compile(
r"\s+Date\s+Time\s+Attr\s+Size\s+Compressed\s+Name"
)
# 2022-12-05 17:23:59 ....A 100656 102400 lorem.txt
regex_file = re.compile(
r"(?P<datetime>\d+-\d+-\d+\s\d+:\d+:\d+)\s+(?P<modes>[A-Z.]{5})(?:\s+(?P<size>\d+))?(?:\s+(?P<compressed>\d+))?\s+(?P<name>.+)"
)
def parse_file_modes(file_modes):
file_mode_list = []
for file_mode in file_modes:
if file_mode == "D":
file_mode_list.append("directory")
elif file_mode == "R":
file_mode_list.append("readonly")
elif file_mode == "H":
file_mode_list.append("hidden")
elif file_mode == "S":
file_mode_list.append("system")
elif file_mode == "A":
file_mode_list.append("archivable")
return file_mode_list
partition = {}
for output_line in output_lines:
if output_line:
# Properties section
match = regex_mode_properties.match(output_line)
if match:
if "path" in partition.keys():
if not self.event.get("meta", {}).get("partitions", []):
self.event["meta"]["partitions"] = []
self.event["meta"]["partitions"].append(partition)
partition = {}
mode = "properties"
# File section
match = regex_mode_files.match(output_line)
if match:
# Wrap up final partition
if "path" in partition.keys():
if not self.event.get("meta", {}).get("partitions", []):
self.event["meta"]["partitions"] = []
self.event["meta"]["partitions"].append(partition)
partition = {}
mode = "files"
# Header section
if not mode:
match = regex_7zip_version.match(output_line)
if match:
version = regex_7zip_version.match(output_line).group(1)
self.event["meta"]["7zip_version"] = version
continue
elif mode == "properties":
# Collect specific properties
match = regex_property.match(output_line)
if match:
if match.group(1) == "Label":
partition["label"] = match.group(2)
elif match.group(1) == "Path":
partition["path"] = match.group(2)
elif match.group(1) == "Type":
partition["type"] = match.group(2)
elif match.group(1) == "Created":
partition["created"] = match.group(2)
elif match.group(1) == "Creator Application":
partition["creator_application"] = match.group(2)
elif match.group(1) == "File System":
partition["file_system"] = match.group(2)
elif mode == "files":
match = regex_file.match(output_line)
if match:
modes_list = parse_file_modes(match.group("modes"))
# Skip excluded paths
if (
os.path.normpath(match.group("name")).split(
os.path.sep
)[0]
in self.EXCLUDED_ROOT_DIRS
):
continue
# Matching ScanIso, collecting hidden directories separately
if "hidden" in modes_list and "directory" in modes_list:
self.event["hidden_dirs"].append(match.group("name"))
if "directory" not in modes_list:
self.event["total"]["files"] += 1
self.event["files"].append(
{
"filename": match.group("name"),
"size": match.group("size"),
"datetime": match.group("datetime"),
}
)
except Exception:
self.flags.append("vhd_7zip_parse_error")
return
def upload(self, name, expire_at):
"""Send extracted file to coordinator"""
with open(name, "rb") as extracted_file:
# Send extracted file back to Strelka
self.emit_file(
extracted_file.read(), name=os.path.basename(extracted_file.name)
)
|
Decompress input file to /tmp with 7zz, send files to coordinator
Source code in strelka/src/python/strelka/scanners/scan_udf.py
| def extract_7zip(self, data, tmp_dir, scanner_timeout, expire_at, file_limit):
"""Decompress input file to /tmp with 7zz, send files to coordinator"""
# Check if 7zip package is installed
if not shutil.which("7zz"):
self.flags.append("vhd_7zip_not_installed_error")
return
with tempfile.NamedTemporaryFile(dir=tmp_dir, mode="wb") as tmp_data:
tmp_data.write(data)
tmp_data.flush()
tmp_data.seek(0)
if not tmp_data:
self.flags.append("vhd_7zip_tmp_error")
return
try:
with tempfile.TemporaryDirectory() as tmp_extract:
try:
(stdout, stderr) = subprocess.Popen(
["7zz", "x", tmp_data.name, f"-o{tmp_extract}"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
).communicate(timeout=scanner_timeout)
except strelka.ScannerTimeout:
raise
except Exception:
self.flags.append("vhd_7zip_extract_process_error")
def get_all_items(root, exclude=None):
"""Iterates through filesystem paths"""
if exclude is None:
exclude = []
for item in root.iterdir():
if item.name in exclude:
continue
yield item
if item.is_dir():
yield from get_all_items(item)
# Iterate over extracted files, except excluded paths
for name in get_all_items(
pathlib.Path(tmp_extract), self.EXCLUDED_ROOT_DIRS
):
if not name.is_file():
continue
if self.event["total"]["extracted"] >= file_limit:
self.flags.append("vhd_file_limit_error")
break
try:
relname = os.path.relpath(name, tmp_extract)
with open(name, "rb") as extracted_file:
# Send extracted file back to Strelka
self.emit_file(extracted_file.read(), name=relname)
self.event["total"]["extracted"] += 1
except strelka.ScannerTimeout:
raise
except Exception:
self.flags.append("vhd_file_upload_error")
except strelka.ScannerTimeout:
raise
except Exception:
self.flags.append("vhd_7zip_extract_error")
try:
(stdout, stderr) = subprocess.Popen(
["7zz", "l", tmp_data.name],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
).communicate(timeout=scanner_timeout)
self.parse_7zip_stdout(stdout.decode("utf-8"), file_limit)
except strelka.ScannerTimeout:
raise
except Exception:
self.flags.append("vhd_7zip_output_error")
return
|
parse_7zip_stdout(output_7zip, file_limit)
Parse 7zz output, create metadata
Source code in strelka/src/python/strelka/scanners/scan_udf.py
| def parse_7zip_stdout(self, output_7zip, file_limit):
"""Parse 7zz output, create metadata"""
mode = None
try:
output_lines = output_7zip.splitlines()
# 7-Zip (z) 24.09 (x64) : Copyright (c) 1999-2021 Igor Pavlov : 2021-12-26
regex_7zip_version = re.compile(r"^7-Zip[^\d]+(\d+\.\d+)")
# --/----
regex_mode_properties = re.compile(r"^(--|----)$")
# Comment =
regex_property = re.compile(r"^(.+) = (.+)$")
# Date Time Attr Size Compressed Name
regex_mode_files = re.compile(
r"\s+Date\s+Time\s+Attr\s+Size\s+Compressed\s+Name"
)
# 2022-12-05 17:23:59 ....A 100656 102400 lorem.txt
regex_file = re.compile(
r"(?P<datetime>\d+-\d+-\d+\s\d+:\d+:\d+)\s+(?P<modes>[A-Z.]{5})(?:\s+(?P<size>\d+))?(?:\s+(?P<compressed>\d+))?\s+(?P<name>.+)"
)
def parse_file_modes(file_modes):
file_mode_list = []
for file_mode in file_modes:
if file_mode == "D":
file_mode_list.append("directory")
elif file_mode == "R":
file_mode_list.append("readonly")
elif file_mode == "H":
file_mode_list.append("hidden")
elif file_mode == "S":
file_mode_list.append("system")
elif file_mode == "A":
file_mode_list.append("archivable")
return file_mode_list
partition = {}
for output_line in output_lines:
if output_line:
# Properties section
match = regex_mode_properties.match(output_line)
if match:
if "path" in partition.keys():
if not self.event.get("meta", {}).get("partitions", []):
self.event["meta"]["partitions"] = []
self.event["meta"]["partitions"].append(partition)
partition = {}
mode = "properties"
# File section
match = regex_mode_files.match(output_line)
if match:
# Wrap up final partition
if "path" in partition.keys():
if not self.event.get("meta", {}).get("partitions", []):
self.event["meta"]["partitions"] = []
self.event["meta"]["partitions"].append(partition)
partition = {}
mode = "files"
# Header section
if not mode:
match = regex_7zip_version.match(output_line)
if match:
version = regex_7zip_version.match(output_line).group(1)
self.event["meta"]["7zip_version"] = version
continue
elif mode == "properties":
# Collect specific properties
match = regex_property.match(output_line)
if match:
if match.group(1) == "Label":
partition["label"] = match.group(2)
elif match.group(1) == "Path":
partition["path"] = match.group(2)
elif match.group(1) == "Type":
partition["type"] = match.group(2)
elif match.group(1) == "Created":
partition["created"] = match.group(2)
elif match.group(1) == "Creator Application":
partition["creator_application"] = match.group(2)
elif match.group(1) == "File System":
partition["file_system"] = match.group(2)
elif mode == "files":
match = regex_file.match(output_line)
if match:
modes_list = parse_file_modes(match.group("modes"))
# Skip excluded paths
if (
os.path.normpath(match.group("name")).split(
os.path.sep
)[0]
in self.EXCLUDED_ROOT_DIRS
):
continue
# Matching ScanIso, collecting hidden directories separately
if "hidden" in modes_list and "directory" in modes_list:
self.event["hidden_dirs"].append(match.group("name"))
if "directory" not in modes_list:
self.event["total"]["files"] += 1
self.event["files"].append(
{
"filename": match.group("name"),
"size": match.group("size"),
"datetime": match.group("datetime"),
}
)
except Exception:
self.flags.append("vhd_7zip_parse_error")
return
|
upload(name, expire_at)
Send extracted file to coordinator
Source code in strelka/src/python/strelka/scanners/scan_udf.py
| def upload(self, name, expire_at):
"""Send extracted file to coordinator"""
with open(name, "rb") as extracted_file:
# Send extracted file back to Strelka
self.emit_file(
extracted_file.read(), name=os.path.basename(extracted_file.name)
)
|
Features
The features of this scanner are detailed below. These features represent the capabilities and the type of analysis the scanner can perform. This may include support for Indicators of Compromise (IOC), the ability to emit files for further analysis, and the presence of extended documentation for complex analysis techniques.
Feature |
Support |
IOC Support |
|
Emit Files |
|
Extended Docs |
|
Malware Scanner |
|
Image Thumbnails |
|
Tastes
Strelka's file distribution system assigns scanners to files based on 'flavors' and 'tastes'. Flavors describe the type of file, typically determined by MIME types from libmagic, matches from YARA rules, or characteristics of parent files. Tastes are the criteria used within Strelka to determine which scanners are applied to which files, with positive and negative tastes defining files to be included or excluded respectively.
Source Filetype |
Include / Exclude |
udf_file |
|
Scanner Fields
This section provides a list of fields that are extracted from the files processed by this scanner. These fields include the data elements that the scanner extracts from each file, representing the analytical results produced by the scanner. If the test file is missing or cannot be parsed, this section will not contain any data.
Field Name |
Field Type |
elapsed |
str
|
files |
list
|
files.datetime |
str
|
files.filename |
str
|
files.size |
str
|
flags |
list
|
hidden_dirs |
list
|
meta |
dict
|
meta.7zip_version |
str
|
meta.partitions |
list
|
meta.partitions.created |
str
|
meta.partitions.path |
str
|
meta.partitions.type |
str
|
total |
dict
|
total.extracted |
int
|
total.files |
int
|
Sample Event
Below is a sample event generated by this scanner, demonstrating the kind of output that can be expected when it processes a file. This sample is derived from a mock scan event configured in the scanner's test file. If no test file is available, this section will not display a sample event.
test_scan_event = {
"elapsed": 0.001,
"flags": [],
"total": {"files": 1, "extracted": 1},
"files": [
{
"filename": "lorem.txt",
"size": "4015",
"datetime": "2022-12-12 03:12:55",
},
],
"hidden_dirs": [],
"meta": {
"7zip_version": "24.09",
"partitions": [
{
"path": 0.001,
"type": "Udf",
"created": 0.001,
},
],
},
}