Atna AI Labs

Getting Started

Introduction

Getting Started

Introduction

Introduction

Atna AI is an intelligent execution and strategy engine built for fast-moving crypto markets. Designed for high-risk environments, Atna enables users to generate structured trading plans across memecoins, altcoins, and major tokens, all tailored to capital, time horizon, and risk profile.

Atna leverages contextual prompts, market-aware logic, and portfolio-ready outputs to generate actionable investment frameworks. The system is optimized for short-term volatility, narrative-driven entries, and collaborative execution across teams or shared wallets.

Installation

Node.js 16.8 or later. macOS, Windows (including WSL), and Linux are supported.

Atna AI is an intelligent execution and strategy engine built for fast-moving crypto markets. Designed for high-risk environments, Atna enables users to generate structured trading plans across memecoins, altcoins, and major tokens, all tailored to capital, time horizon, and risk profile.

Atna leverages contextual prompts, market-aware logic, and portfolio-ready outputs to generate actionable investment frameworks. The system is optimized for short-term volatility, narrative-driven entries, and collaborative execution across teams or shared wallets.

System Architecture & Design Philosophy

Atna AI is built for the edge — where attention spans are short, volatility is constant, and decisions happen faster than most systems can respond. To handle the pace and complexity of the memecoin market, Atna’s architecture is modeled after decentralized trading cells: small, specialized units that each perform one task extremely well and report back instantly. This modular system is built around three core components:

Construction

Core Orchestrator

At the heart of Atna is the Strategy Core, a lightweight orchestration engine that: Interprets user input (capital, timeline, goals) Activates relevant analysis modules Routes output to a unified schema for strategy execution The Core is designed to stay out of the way. It doesn’t make assumptions — it delegates.

Specialized Micro-Agents

Instead of one monolithic AI, Atna operates through parallel micro-agents, each trained to handle specific aspects of crypto speculation: Trend Watcher – Tracks emerging narratives across X, DEX volumes, and Telegram buzz Entry Scanner – Surfaces tokens with unusual activity: low FDV + sudden attention Risk Splitter – Allocates capital dynamically based on volatility, hype age, and liquidity Rotation Timer – Suggests optimal entry/exit points using behavioral timing heuristics Team Planner – Structures collaboration, assigns user roles (e.g., scanner, executor) Each agent thinks independently but speaks in the same schema, making their output easy to merge and deploy.

Output Composer

All agent outputs are compiled into a structured strategy doc — not just a list of tokens, but a full tactical plan. This includes: Token picks with reason Allocation strategy Suggested tracking setup (wallets, tags, timing) Team roles for shared trading (if needed) Visual prompt and mood for strategy snapshot The entire output is formatted as raw JSON for direct use in dashboards, bots, or execution layers.

Running a simlar Model

The workflow is kicked off by sending a user request to the Head Portfolio Manager (PM) agent. The PM agent orchestrates the entire process, delegating to specialist agents and tools as needed. You can monitor the workflow in real time using OpenAI Traces, which provide detailed visibility into every agent and tool call.

# Install required dependencies
!pip install -r requirements.txt
import os

missing = []
if not os.environ.get('OPENAI_API_KEY'):
    missing.append('OPENAI_API_KEY')
if not os.environ.get('FRED_API_KEY'):
    missing.append('FRED_API_KEY')

if missing:
    print(f"Missing environment variable(s): {', '.join(missing)}. Please set them before running the workflow.")
else:
    print("All required API keys are set.")
import datetime
import json
import os
from pathlib import Path
from contextlib import AsyncExitStack
from agents import Runner, add_trace_processor, trace
from agents.tracing.processors import BatchTraceProcessor
from utils import FileSpanExporter, output_file
from investment_agents.config import build_investment_agents
import asyncio

add_trace_processor(BatchTraceProcessor(FileSpanExporter()))

async def run_workflow():
    if "OPENAI_API_KEY" not in os.environ:
        raise EnvironmentError("OPENAI_API_KEY not set — set it as an environment variable before running.")

    today_str = datetime.date.today().strftime("%B %d, %Y")
    question = (
        f"Today is {today_str}. "
        "How would the planned interest rate reduction effect my holdings in GOOGL if they were to happen?"
        "Considering all the factors effecting its price right now (Macro, Technical, Fundamental, etc.), what is a realistic price target by the end of the year?"
    )
    bundle = build_investment_agents()

    async with AsyncExitStack() as stack:
        for agent in [getattr(bundle, "fundamental", None), getattr(bundle, "quant", None)]:
            if agent is None:
                continue
            for server in getattr(agent, "mcp_servers", []):
                await server.connect()
                await stack.enter_async_context(server)

        print("Running multi-agent workflow with tracing enabled...\n")
        with trace(
            "Investment Research Workflow",
            metadata={"question": question[:512]}
        ) as workflow_trace:
            print(
                f"\n🔗 View the trace in the OpenAI console: "
                f"https://platform.openai.com/traces/trace?trace_id={workflow_trace.trace_id}\n"
            )

            response = None
            try:
                response = await asyncio.wait_for(
                    Runner.run(bundle.head_pm, question, max_turns=40),
                    timeout=1200
                )
            except asyncio.TimeoutError:
                print("\n❌ Workflow timed out after 20 minutes.")

            report_path = None
            try:
                if hasattr(response, 'final_output'):
                    output = response.final_output
                    if isinstance(output, str):
                        data = json.loads(output)
                        if isinstance(data, dict) and 'file' in data:
                            report_path = output_file(data['file'])
            except Exception as e:
                print(f"Could not parse investment report path: {e}")

            print(f"Workflow Completed Response from Agent: {response.final_output if hasattr(response, 'final_output') else response}, investment report created: {report_path if report_path else '[unknown]'}")

# In a Jupyter notebook cell, run:
await run_workflow()