Skip to content

Scanners API Reference

cloudaudit.scanners.secret_scanner

cloudaudit — Secret & Sensitive Data Scanner

Analyses text file content for
  • Cloud provider credentials (AWS, GCP, Azure)
  • Auth tokens (JWT, OAuth, GitHub/GitLab)
  • Private keys and certificates
  • Database connection strings with credentials
  • Hardcoded passwords
  • PII indicators (emails, phone numbers)
  • Internal infrastructure hints
  • Terraform state secrets
  • CI/CD pipeline secrets

All matches are redacted before being stored in findings. The full secret value is never logged or stored.

SecretScanner

Scan text content for secrets, credentials, and sensitive data.

Findings include redacted matches only — the actual secret value is never stored in the Finding object.

Source code in cloudaudit/scanners/secret_scanner.py
class SecretScanner:
    """
    Scan text content for secrets, credentials, and sensitive data.

    Findings include redacted matches only — the actual secret value
    is never stored in the Finding object.
    """

    def __init__(self, min_entropy: float = 3.5) -> None:
        self._min_entropy = min_entropy
        self._compiled = [
            (p, re.compile(p.pattern, re.MULTILINE))
            for p in _PATTERNS
        ]

    def scan(self, content: str, file_url: str, file_type: FileType) -> List[Finding]:
        findings: List[Finding] = []

        for pattern, regex in self._compiled:
            for m in regex.finditer(content):
                # Prefer group(1) if capturing group exists, else full match
                try:
                    matched = m.group(1)
                except IndexError:
                    matched = m.group(0)

                if not matched:
                    continue

                # Apply custom validator if defined
                if pattern.validation and not pattern.validation(matched):
                    continue

                # Context keyword requirement
                if pattern.context_required:
                    window = content[max(0, m.start()-150): m.end()+150].lower()
                    if not any(k in window for k in pattern.context_required):
                        continue

                # Entropy gate: very low-entropy strings are likely false positives
                ent = calculate_entropy(matched)
                effective_severity = pattern.severity
                if ent < self._min_entropy and pattern.severity in (Severity.MEDIUM, Severity.LOW):
                    continue   # skip — likely a placeholder or example
                if ent > 5.2 and effective_severity == Severity.MEDIUM:
                    effective_severity = Severity.HIGH   # high-entropy medium → escalate

                # Compute confidence
                confidence = self._confidence(matched, ent, pattern)

                # Redact the actual value before storing
                redacted_match = redact(matched)
                context_snip   = self._context_snippet(content, m.start(), radius=3)

                findings.append(Finding(
                    file_url=file_url,
                    file_name=url_filename(file_url),
                    file_type=file_type,
                    category=pattern.category,
                    rule_name=pattern.name,
                    description=pattern.description,
                    severity=effective_severity,
                    match=redacted_match,
                    context=self._sanitise_context(context_snip),
                    line_number=content[: m.start()].count("\n") + 1,
                    recommendation=pattern.recommendation,
                    compliance_refs=pattern.compliance,
                    confidence=confidence,
                    scanner="SecretScanner",
                ))

        return findings

    # ── Helpers ────────────────────────────────────────────────────────────────

    @staticmethod
    def _context_snippet(content: str, pos: int, radius: int = 3) -> str:
        lines = content.split("\n")
        cur   = 0
        for i, line in enumerate(lines):
            if cur + len(line) + 1 > pos:
                lo = max(0, i - radius)
                hi = min(len(lines), i + radius + 1)
                return "\n".join(lines[lo:hi])
            cur += len(line) + 1
        return ""

    @staticmethod
    def _sanitise_context(snippet: str) -> str:
        """
        Light sanitisation of context lines — remove obvious secret values
        while preserving line structure so analysts can understand the finding.
        """
        # Redact anything that looks like a long base64 / hex value
        sanitised = re.sub(
            r"([A-Za-z0-9+/=]{40,})",
            lambda m: redact(m.group(1)),
            snippet,
        )
        return sanitised[:800]  # cap context length

    @staticmethod
    def _confidence(matched: str, entropy: float, pattern: Pattern) -> float:
        conf = 0.4
        if len(matched) > 20:
            conf += 0.15
        if entropy > 4.0:
            conf += 0.20
        if entropy > 5.0:
            conf += 0.15
        if pattern.validation:
            conf += 0.10   # validated patterns are more reliable
        return min(conf, 1.0)

cloudaudit.scanners.container_detector

cloudaudit — Container Type Auto-Detector

Inspects the HTTP response (headers + body) from the target URL and determines the cloud storage provider, container name, and region without requiring the user to specify them manually.

Detection uses
  1. HTTP response headers (x-amz-, x-goog-, x-ms-*, Server)
  2. XML namespace from the list response body
  3. Hostname pattern matching
  4. Response body element fingerprinting (, , etc.)

ContainerDetector

Analyse a URL + HTTP response to identify the cloud storage provider and extract container/bucket metadata automatically.

Source code in cloudaudit/scanners/container_detector.py
class ContainerDetector:
    """
    Analyse a URL + HTTP response to identify the cloud storage provider
    and extract container/bucket metadata automatically.
    """

    def detect(
        self,
        url: str,
        response: aiohttp.ClientResponse,
        body: str,
    ) -> ContainerInfo:
        headers = {k.lower(): v for k, v in response.headers.items()}
        parsed  = urlparse(url)
        host    = parsed.hostname or ""

        info = ContainerInfo(raw_url=url)

        # ── 1. Try header-based detection first (fastest) ──────────────────────
        if any(h in headers for h in AWS_S3_HEADERS):
            info.container_type = ContainerType.AWS_S3
        elif any(h in headers for h in GCS_HEADERS):
            info.container_type = ContainerType.GCS
        elif any(h in headers for h in AZURE_HEADERS):
            info.container_type = ContainerType.AZURE_BLOB

        # ── 2. Hostname pattern matching ───────────────────────────────────────
        if info.container_type == ContainerType.UNKNOWN:
            m = _AWS_S3_HOST_RE.search(host)
            if m:
                info.container_type = ContainerType.AWS_S3
                if m.group("region"):
                    info.region = m.group("region")
                if m.group("bucket"):
                    info.container_name = m.group("bucket")

            elif _AWS_CF_HOST_RE.search(host):
                info.container_type = ContainerType.CLOUDFRONT

            elif _GCS_HOST_RE.search(host):
                info.container_type = ContainerType.GCS
                gm = _GCS_HOST_RE.search(host)
                if gm and gm.group("bucket"):
                    info.container_name = gm.group("bucket")

            elif _AZURE_HOST_RE.search(host):
                info.container_type = ContainerType.AZURE_BLOB
                am = _AZURE_HOST_RE.search(host)
                if am:
                    info.container_name = am.group("account")

        # ── 3. XML body fingerprinting (most reliable for content) ────────────
        if body.strip().startswith("<?xml") or body.strip().startswith("<"):
            self._parse_xml_body(body, info)

        # ── 4. HTML open directory detection ─────────────────────────────────
        if info.container_type == ContainerType.UNKNOWN:
            if re.search(r"<title>Index of", body, re.IGNORECASE):
                info.container_type = ContainerType.OPEN_DIRECTORY
            elif re.search(r"<a\s+href=", body, re.IGNORECASE):
                info.container_type = ContainerType.GENERIC

        # ── 5. Fill in metadata from response headers ─────────────────────────
        info.server_header = headers.get("server", "")
        if region := headers.get("x-amz-bucket-region", ""):
            info.region = region
        info.extra_headers = {
            k: v for k, v in headers.items()
            if k in ("server", "content-type", "x-amz-bucket-region",
                     "x-goog-stored-content-encoding", "x-ms-version")
        }

        # ── 6. Assess public access ───────────────────────────────────────────
        # If we got a 200 with a listing body, it's definitively public
        info.is_public = (response.status == 200)
        if info.is_public and info.container_type == ContainerType.AWS_S3:
            info.notes.append(
                "S3 Block Public Access is NOT enabled — bucket listing is publicly accessible."
            )

        return info

    # ── XML parsing helpers ────────────────────────────────────────────────────

    def _parse_xml_body(self, body: str, info: ContainerInfo) -> None:
        """Extract container name and type from XML listing body."""
        try:
            root = ET.fromstring(body)
        except ET.ParseError:
            return

        tag = root.tag
        ns  = ""
        if tag.startswith("{"):
            ns, _, local = tag[1:].partition("}")
        else:
            local = tag

        # AWS S3: <ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
        if local in ("ListBucketResult", "ListAllMyBucketsResult") or ns in AWS_S3_NAMESPACES:
            info.container_type = ContainerType.AWS_S3
            if not info.container_name:
                name_el = root.find(f"{{{ns}}}Name" if ns else "Name")
                if name_el is not None and name_el.text:
                    info.container_name = name_el.text.strip()
            region_el = root.find(f"{{{ns}}}BucketRegion" if ns else "BucketRegion")
            if region_el is not None and region_el.text:
                info.region = region_el.text.strip()

        # GCS: <ListBucketResult> (GCS uses same XML schema as S3 in XML API)
        elif local == "ListBucketResult":
            # Could be GCS or S3-compatible — rely on header detection above
            if not info.container_name:
                name_el = root.find("Name")
                if name_el is not None and name_el.text:
                    info.container_name = name_el.text.strip()

        # Azure Blob: <EnumerationResults ServiceEndpoint="...">
        elif local == "EnumerationResults":
            info.container_type = ContainerType.AZURE_BLOB
            # Azure puts container name in the ContainerName element
            cont_el = root.find("ContainerName")
            if cont_el is not None and cont_el.text:
                info.container_name = cont_el.text.strip()
            # Or from attribute on the root
            endpoint = root.get("ServiceEndpoint", "")
            if endpoint and not info.container_name:
                # e.g. https://myaccount.blob.core.windows.net/mycontainer/
                parts = [p for p in endpoint.rstrip("/").split("/") if p]
                if parts:
                    info.container_name = parts[-1]

cloudaudit.scanners.crawler

cloudaudit — Recursive File Crawler

Supports

• AWS S3 XML listing (with IsTruncated / ContinuationToken pagination) • GCS XML listing (same schema, different namespace) • Azure Blob XML listing • Generic HTML directory listings (Apache, nginx, etc.) • CloudFront / CDN (falls back to HTML parsing)

Respects

• max_depth recursion limit • ignore_paths filters • file size filtering (via HEAD requests when sizes are in XML)

FileCrawler

Recursively discover all objects/files exposed in a cloud storage endpoint.

Strategy selection is automatic based on the detected container type.

Source code in cloudaudit/scanners/crawler.py
class FileCrawler:
    """
    Recursively discover all objects/files exposed in a cloud storage endpoint.

    Strategy selection is automatic based on the detected container type.
    """

    def __init__(self, config: AuditConfig) -> None:
        self._config    = config
        self._classifier = FileClassifier()
        self._discovered: List[ExposedFile] = []
        self._visited_urls: Set[str] = set()

    async def crawl(
        self,
        http: HTTPClient,
        base_url: str,
        container_type: ContainerType,
    ) -> List[ExposedFile]:
        """
        Entry point. Returns list of all discovered ExposedFile objects.
        """
        self._discovered = []
        self._visited_urls = set()

        if container_type in (ContainerType.AWS_S3, ContainerType.GCS):
            await self._crawl_s3_xml(http, base_url)
        elif container_type == ContainerType.AZURE_BLOB:
            await self._crawl_azure_xml(http, base_url)
        else:
            # Generic HTML directory listing or CloudFront
            await self._crawl_html(http, base_url, depth=0)

        logger.info("Crawler finished: %d files discovered", len(self._discovered))
        return self._discovered

    # ── AWS S3 / GCS XML crawler ───────────────────────────────────────────────

    async def _crawl_s3_xml(self, http: HTTPClient, base_url: str) -> None:
        """
        Parse S3 ListBucketResult XML.
        Handles IsTruncated=true pagination automatically via ContinuationToken.
        Also uses CommonPrefixes for virtual folder enumeration.
        """
        continuation_token: Optional[str] = None
        prefixes_to_crawl: list[str] = [""]   # start at root prefix

        processed_prefixes: Set[str] = set()

        while prefixes_to_crawl:
            prefix = prefixes_to_crawl.pop(0)
            if prefix in processed_prefixes:
                continue
            processed_prefixes.add(prefix)

            continuation_token = None  # reset per prefix
            while True:
                # Build the listing URL
                params = "list-type=2&delimiter=%2F"
                if prefix:
                    params += f"&prefix={prefix}"
                if continuation_token:
                    params += f"&continuation-token={continuation_token}"

                list_url = f"{base_url.rstrip('/')}/?{params}"
                logger.debug("S3 listing: %s", list_url)

                try:
                    resp = await http.get(list_url)
                    if resp.status != 200:
                        logger.warning("S3 listing returned HTTP %d for prefix=%r", resp.status, prefix)
                        break
                    body = await resp.text(errors="replace")
                except Exception as exc:
                    logger.debug("S3 listing error (prefix=%r): %s", prefix, exc)
                    break

                is_truncated, token, keys, sub_prefixes = self._parse_s3_xml(body, base_url)

                for ef in keys:
                    if self._should_include(ef):
                        self._discovered.append(ef)

                # Queue sub-prefixes (virtual folders) for recursive crawling
                for sp in sub_prefixes:
                    if sp not in processed_prefixes:
                        prefixes_to_crawl.append(sp)

                if is_truncated and token:
                    continuation_token = token
                else:
                    break

    def _parse_s3_xml(
        self, body: str, base_url: str
    ) -> Tuple[bool, Optional[str], List[ExposedFile], List[str]]:
        """
        Returns (is_truncated, continuation_token, [ExposedFile], [sub_prefixes]).
        Handles both old-style (Marker) and new ListObjectsV2 (ContinuationToken).
        """
        files: List[ExposedFile] = []
        sub_prefixes: List[str] = []
        is_truncated = False
        token: Optional[str] = None

        try:
            root = ET.fromstring(body)
        except ET.ParseError as exc:
            logger.debug("XML parse error: %s", exc)
            return False, None, files, sub_prefixes

        # Detect namespace
        tag = root.tag
        ns = ""
        if tag.startswith("{"):
            ns = tag[1:].split("}")[0]

        def _find(el: ET.Element, name: str) -> Optional[ET.Element]:
            return el.find(f"{{{ns}}}{name}" if ns else name)

        def _findall(el: ET.Element, name: str) -> list[ET.Element]:
            return el.findall(f"{{{ns}}}{name}" if ns else name)

        def _text(el: ET.Element, name: str) -> str:
            child = _find(el, name)
            return child.text.strip() if child is not None and child.text else ""

        # IsTruncated
        trunc_el = _find(root, "IsTruncated")
        if trunc_el is not None and trunc_el.text:
            is_truncated = trunc_el.text.strip().lower() == "true"

        # ContinuationToken (v2) or NextMarker (v1)
        token_el = _find(root, "NextContinuationToken") or _find(root, "NextMarker")
        if token_el is not None and token_el.text:
            token = token_el.text.strip()

        # Contents (files)
        for content in _findall(root, "Contents"):
            key  = _text(content, "Key")
            size = _text(content, "Size")
            mtime= _text(content, "LastModified")
            etag = _text(content, "ETag").strip('"')

            if not key or key.endswith("/"):
                continue   # skip directory markers

            url = self._key_to_url(base_url, key)
            ef  = ExposedFile(
                url=url,
                key=key,
                size_bytes=int(size) if size.isdigit() else 0,
                last_modified=mtime,
                file_type=self._classifier.classify(url),
                etag=etag,
            )
            files.append(ef)

        # CommonPrefixes (virtual directories → recurse into them)
        for cp in _findall(root, "CommonPrefixes"):
            prefix_el = _find(cp, "Prefix")
            if prefix_el is not None and prefix_el.text:
                sub_prefixes.append(prefix_el.text.strip())

        return is_truncated, token, files, sub_prefixes

    # ── Azure Blob XML crawler ─────────────────────────────────────────────────

    async def _crawl_azure_xml(self, http: HTTPClient, base_url: str) -> None:
        """Azure Blob listing uses ?restype=container&comp=list."""
        marker: Optional[str] = None
        while True:
            params = "restype=container&comp=list"
            if marker:
                params += f"&marker={marker}"
            list_url = f"{base_url.rstrip('/')}?{params}"
            try:
                resp = await http.get(list_url)
                if resp.status != 200:
                    break
                body = await resp.text(errors="replace")
            except Exception as exc:
                logger.debug("Azure listing error: %s", exc)
                break

            new_files, next_marker = self._parse_azure_xml(body, base_url)
            for ef in new_files:
                if self._should_include(ef):
                    self._discovered.append(ef)

            if next_marker:
                marker = next_marker
            else:
                break

    def _parse_azure_xml(
        self, body: str, base_url: str
    ) -> Tuple[List[ExposedFile], Optional[str]]:
        files: List[ExposedFile] = []
        next_marker: Optional[str] = None

        try:
            root = ET.fromstring(body)
        except ET.ParseError:
            return files, None

        blobs = root.find("Blobs")
        if blobs is None:
            return files, None

        for blob in blobs.findall("Blob"):
            name_el = blob.find("Name")
            if name_el is None or not name_el.text:
                continue
            key = name_el.text.strip()

            props = blob.find("Properties")
            size = 0
            mtime = ""
            if props is not None:
                size_el  = props.find("Content-Length")
                mtime_el = props.find("Last-Modified")
                size  = int(size_el.text) if size_el is not None and size_el.text else 0
                mtime = mtime_el.text if mtime_el is not None and mtime_el.text else ""

            url = self._key_to_url(base_url, key)
            files.append(ExposedFile(
                url=url,
                key=key,
                size_bytes=size,
                last_modified=mtime,
                file_type=self._classifier.classify(url),
            ))

        marker_el = root.find("NextMarker")
        if marker_el is not None and marker_el.text:
            next_marker = marker_el.text.strip()

        return files, next_marker

    # ── Generic HTML directory listing crawler ─────────────────────────────────

    async def _crawl_html(
        self, http: HTTPClient, url: str, depth: int
    ) -> None:
        """Recursively crawl an HTML directory listing (Apache/nginx style)."""
        if depth > self._config.max_depth:
            return
        if url in self._visited_urls:
            return
        self._visited_urls.add(url)

        try:
            resp = await http.get(url)
            if resp.status != 200:
                return
            html = await resp.text(errors="replace")
        except Exception as exc:
            logger.debug("HTML crawl error at %s: %s", url, exc)
            return

        files, dirs = self._parse_html_links(html, url)

        for file_url, key in files:
            ef = ExposedFile(
                url=file_url,
                key=key,
                file_type=self._classifier.classify(file_url),
            )
            if self._should_include(ef):
                self._discovered.append(ef)

        tasks = [
            self._crawl_html(http, dir_url, depth + 1)
            for dir_url in dirs
        ]
        await asyncio.gather(*tasks, return_exceptions=True)

    def _parse_html_links(
        self, html: str, base_url: str
    ) -> Tuple[List[Tuple[str, str]], List[str]]:
        """Return ([(file_url, key), ...], [dir_url, ...])."""
        files: List[Tuple[str, str]] = []
        dirs:  List[str] = []
        seen:  Set[str]  = set()
        base_parsed = urlparse(base_url)

        for m in _HREF_RE.finditer(html):
            href = m.group(1).strip()
            if href in ("", "../", "./", "#") or href in seen:
                continue
            if any(p in href for p in self._config.ignore_paths):
                continue
            # Skip absolute links to different hosts
            parsed = urlparse(href)
            if parsed.netloc and parsed.netloc != base_parsed.netloc:
                continue
            seen.add(href)

            full_url = urljoin(base_url, href)
            key      = urlparse(full_url).path.lstrip("/")

            if href.endswith("/"):
                dirs.append(full_url)
            else:
                files.append((full_url, key))

        return files, dirs

    # ── Helpers ────────────────────────────────────────────────────────────────

    def _key_to_url(self, base_url: str, key: str) -> str:
        return base_url.rstrip("/") + "/" + key.lstrip("/")

    def _should_include(self, ef: ExposedFile) -> bool:
        """Return True if this file should be included in the audit scope."""
        ext = Path(ef.key).suffix.lower()

        # Skip binary media — no content to analyse
        if ext in SKIP_DOWNLOAD_EXTENSIONS:
            return False

        # Skip configured ignore paths
        if any(p in ef.key for p in self._config.ignore_paths):
            return False

        # If caller specified explicit extension allow-list, enforce it
        if self._config.extensions:
            allowed = {e.lstrip(".") for e in self._config.extensions}
            if ext.lstrip(".") not in allowed:
                return False

        # Skip files that are too large (we know size from XML listing)
        if ef.size_bytes and ef.size_bytes > self._config.max_file_size:
            logger.debug("Skipping %s — too large (%d bytes)", ef.key, ef.size_bytes)
            return False

        return True

crawl(http, base_url, container_type) async

Entry point. Returns list of all discovered ExposedFile objects.

Source code in cloudaudit/scanners/crawler.py
async def crawl(
    self,
    http: HTTPClient,
    base_url: str,
    container_type: ContainerType,
) -> List[ExposedFile]:
    """
    Entry point. Returns list of all discovered ExposedFile objects.
    """
    self._discovered = []
    self._visited_urls = set()

    if container_type in (ContainerType.AWS_S3, ContainerType.GCS):
        await self._crawl_s3_xml(http, base_url)
    elif container_type == ContainerType.AZURE_BLOB:
        await self._crawl_azure_xml(http, base_url)
    else:
        # Generic HTML directory listing or CloudFront
        await self._crawl_html(http, base_url, depth=0)

    logger.info("Crawler finished: %d files discovered", len(self._discovered))
    return self._discovered