Tools
Tools extend agent capabilities by providing access to external systems, APIs, and functions. They enable agents to perform actions beyond text generation.
🛠️ What are Tools?
Tools in Niflheim-X allow agents to:
- 🌐 Access external APIs and web services
- 💾 Read and write files and databases
- 🧮 Perform calculations and data processing
- 📊 Generate charts and visualizations
- 🔧 Execute system commands safely
- 🔍 Search knowledge bases and documentation
🏗️ Tool Architecture
graph TD
A[Agent Decision] --> B[Tool Selection]
B --> C[Parameter Validation]
C --> D[Tool Execution]
D --> E[Result Processing]
E --> F[Agent Context Update]
F --> G[Response Generation]
H[Tool Registry] --> B
I[Security Layer] --> D
J[Error Handling] --> E
🚀 Creating Tools
Simple Function Tool
from niflheim_x import Tool
def calculate_tip(bill_amount: float, tip_percentage: float = 15.0) -> str:
"""Calculate tip amount and total bill."""
tip = bill_amount * (tip_percentage / 100)
total = bill_amount + tip
return f"Tip: ${tip:.2f}, Total: ${total:.2f}"
# Create tool from function
tip_calculator = Tool.from_function(
function=calculate_tip,
name="tip_calculator",
description="Calculate tip and total amount for a bill"
)
# Register with agent
agent.register_tool(tip_calculator)
Advanced Tool Definition
from niflheim_x import Tool
import requests
import json
def search_weather(city: str, country: str = "US") -> str:
"""Get current weather for a city."""
try:
api_key = "your-weather-api-key"
url = f"http://api.openweathermap.org/data/2.5/weather"
params = {
"q": f"{city},{country}",
"appid": api_key,
"units": "metric"
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
weather = data["weather"][0]["description"]
temp = data["main"]["temp"]
return f"Weather in {city}: {weather}, {temp}°C"
except Exception as e:
return f"Error getting weather: {str(e)}"
# Create tool with detailed schema
weather_tool = Tool(
name="search_weather",
description="Get current weather conditions for any city worldwide",
function=search_weather,
parameters={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "Name of the city"
},
"country": {
"type": "string",
"description": "Country code (default: US)",
"default": "US"
}
},
"required": ["city"]
},
examples=[
{
"input": {"city": "London", "country": "UK"},
"output": "Weather in London: cloudy, 15°C"
}
]
)
agent.register_tool(weather_tool)
🎯 Tool Categories
1. Data Processing Tools
import pandas as pd
from typing import Dict, Any
def analyze_data(data: str, analysis_type: str = "summary") -> str:
"""Analyze CSV data and return insights."""
try:
# Parse CSV data
from io import StringIO
df = pd.read_csv(StringIO(data))
if analysis_type == "summary":
return df.describe().to_string()
elif analysis_type == "correlation":
return df.corr().to_string()
elif analysis_type == "info":
return df.info()
else:
return "Unsupported analysis type"
except Exception as e:
return f"Error analyzing data: {str(e)}"
data_analyzer = Tool.from_function(
function=analyze_data,
name="analyze_data",
description="Analyze CSV data and provide statistical insights"
)
2. File System Tools
import os
from pathlib import Path
def read_file(file_path: str, max_lines: int = 100) -> str:
"""Read contents of a text file safely."""
try:
path = Path(file_path)
# Security check
if not path.exists() or not path.is_file():
return "File not found or is not a regular file"
# Size check (max 1MB)
if path.stat().st_size > 1024 * 1024:
return "File too large (max 1MB)"
with open(path, 'r', encoding='utf-8') as f:
lines = f.readlines()[:max_lines]
content = ''.join(lines)
if len(lines) == max_lines:
content += f"\n... (showing first {max_lines} lines)"
return content
except Exception as e:
return f"Error reading file: {str(e)}"
def write_file(file_path: str, content: str, append: bool = False) -> str:
"""Write content to a file safely."""
try:
path = Path(file_path)
# Security check - only allow writes to specific directories
allowed_dirs = [Path("./data"), Path("./output")]
if not any(path.is_relative_to(allowed_dir) for allowed_dir in allowed_dirs):
return "Write access denied - file must be in allowed directories"
# Ensure directory exists
path.parent.mkdir(parents=True, exist_ok=True)
mode = 'a' if append else 'w'
with open(path, mode, encoding='utf-8') as f:
f.write(content)
return f"Successfully {'appended to' if append else 'wrote'} {file_path}"
except Exception as e:
return f"Error writing file: {str(e)}"
# Register file tools
agent.register_tool(Tool.from_function(read_file, name="read_file"))
agent.register_tool(Tool.from_function(write_file, name="write_file"))
3. Web API Tools
import requests
from urllib.parse import quote
def search_web(query: str, num_results: int = 5) -> str:
"""Search the web and return results."""
try:
# Using a hypothetical search API
api_key = "your-search-api-key"
url = "https://api.example-search.com/search"
params = {
"q": query,
"key": api_key,
"num": num_results
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
results = response.json()
formatted_results = []
for i, result in enumerate(results.get("items", []), 1):
title = result.get("title", "No title")
snippet = result.get("snippet", "No description")
link = result.get("link", "")
formatted_results.append(f"{i}. {title}\n {snippet}\n {link}\n")
return "\n".join(formatted_results)
except Exception as e:
return f"Error searching web: {str(e)}"
def send_email(to: str, subject: str, body: str) -> str:
"""Send an email via API."""
try:
# Using a hypothetical email API
api_key = "your-email-api-key"
url = "https://api.example-email.com/send"
payload = {
"to": to,
"subject": subject,
"body": body,
"from": "bot@yourcompany.com"
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
response = requests.post(url, json=payload, headers=headers, timeout=10)
response.raise_for_status()
return f"Email sent successfully to {to}"
except Exception as e:
return f"Error sending email: {str(e)}"
# Register web tools
agent.register_tool(Tool.from_function(search_web, name="search_web"))
agent.register_tool(Tool.from_function(send_email, name="send_email"))
4. Database Tools
import sqlite3
from typing import List, Dict, Any
class DatabaseTool:
def __init__(self, db_path: str):
self.db_path = db_path
def query_database(self, sql: str, params: List[Any] = None) -> str:
"""Execute a SELECT query on the database."""
try:
# Security: Only allow SELECT statements
if not sql.strip().upper().startswith('SELECT'):
return "Error: Only SELECT queries are allowed"
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
results = cursor.fetchall()
if not results:
return "No results found"
# Convert to readable format
headers = list(results[0].keys())
rows = [dict(row) for row in results]
output = []
output.append(" | ".join(headers))
output.append("-" * (len(" | ".join(headers))))
for row in rows[:50]: # Limit to 50 rows
values = [str(row[header]) for header in headers]
output.append(" | ".join(values))
if len(results) > 50:
output.append(f"... ({len(results)} total rows)")
return "\n".join(output)
except Exception as e:
return f"Database error: {str(e)}"
# Create database tool
db_tool = DatabaseTool("./data/app.db")
agent.register_tool(Tool.from_function(
function=db_tool.query_database,
name="query_database",
description="Query the application database with SQL SELECT statements"
))
🔧 Tool Registry
Organize and manage multiple tools:
from niflheim_x import ToolRegistry
# Create tool registry
registry = ToolRegistry()
# Add tools
registry.register(weather_tool)
registry.register(tip_calculator)
registry.register(data_analyzer)
# Create agent with registry
agent = Agent(
name="ToolBot",
llm=llm,
tool_registry=registry
)
# Registry methods
all_tools = registry.list_tools()
weather = registry.get_tool("search_weather")
registry.unregister("old_tool")
# Bulk registration
file_tools = [
Tool.from_function(read_file, name="read_file"),
Tool.from_function(write_file, name="write_file"),
Tool.from_function(list_files, name="list_files")
]
registry.register_bulk(file_tools)
🛡️ Tool Security
Input Validation
from niflheim_x import Tool
from typing import Union
import re
def secure_calculator(expression: str) -> Union[str, float]:
"""Safely evaluate mathematical expressions."""
# Whitelist allowed characters
allowed_chars = re.compile(r'^[0-9+\-*/().\s]+$')
if not allowed_chars.match(expression):
return "Error: Invalid characters in expression"
# Additional security checks
if len(expression) > 100:
return "Error: Expression too long"
try:
# Use eval with restricted globals for safety
result = eval(expression, {"__builtins__": {}}, {})
return f"Result: {result}"
except Exception as e:
return f"Error: {str(e)}"
calculator_tool = Tool.from_function(
function=secure_calculator,
name="calculator",
description="Safely evaluate mathematical expressions"
)
Access Control
from niflheim_x import Tool
from functools import wraps
def require_permission(permission: str):
"""Decorator to check permissions before tool execution."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Check if current user has permission
current_user = kwargs.get('_user_context', {})
user_permissions = current_user.get('permissions', [])
if permission not in user_permissions:
return f"Access denied: Requires '{permission}' permission"
return func(*args, **kwargs)
return wrapper
return decorator
@require_permission('file_write')
def protected_file_write(file_path: str, content: str) -> str:
"""Write file with permission check."""
# Implementation here
pass
# Register with permission metadata
protected_tool = Tool(
name="protected_write",
description="Write files (requires file_write permission)",
function=protected_file_write,
metadata={"required_permission": "file_write"}
)
📊 Tool Monitoring
Tool Usage Analytics
from niflheim_x import Tool, ToolRegistry
import time
from functools import wraps
def monitor_tool_usage(func):
"""Monitor tool execution time and success rate."""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
tool_name = func.__name__
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
# Log successful execution
print(f"Tool '{tool_name}' executed successfully in {execution_time:.2f}s")
return result
except Exception as e:
execution_time = time.time() - start_time
# Log failed execution
print(f"Tool '{tool_name}' failed after {execution_time:.2f}s: {str(e)}")
raise
return wrapper
# Apply monitoring to tools
@monitor_tool_usage
def monitored_weather_tool(city: str) -> str:
# Tool implementation
pass
Tool Performance Metrics
class ToolMetrics:
def __init__(self):
self.usage_stats = {}
def record_usage(self, tool_name: str, execution_time: float, success: bool):
if tool_name not in self.usage_stats:
self.usage_stats[tool_name] = {
'total_calls': 0,
'successful_calls': 0,
'total_time': 0,
'average_time': 0
}
stats = self.usage_stats[tool_name]
stats['total_calls'] += 1
stats['total_time'] += execution_time
stats['average_time'] = stats['total_time'] / stats['total_calls']
if success:
stats['successful_calls'] += 1
def get_report(self) -> str:
report = ["Tool Usage Report:", "=" * 20]
for tool_name, stats in self.usage_stats.items():
success_rate = (stats['successful_calls'] / stats['total_calls']) * 100
report.append(f"{tool_name}:")
report.append(f" Calls: {stats['total_calls']}")
report.append(f" Success Rate: {success_rate:.1f}%")
report.append(f" Avg Time: {stats['average_time']:.2f}s")
report.append("")
return "\n".join(report)
# Use metrics
metrics = ToolMetrics()
# Record usage in tool wrappers
🎯 Best Practices
1. Tool Design Principles
# ✅ Good: Clear, focused functionality
def format_date(date_string: str, format_type: str = "iso") -> str:
"""Format a date string to specified format."""
# Implementation
# ❌ Bad: Too many responsibilities
def do_everything(input_data: str, action: str, options: dict) -> str:
"""Handles multiple unrelated operations."""
# Implementation
2. Error Handling
def robust_tool(input_data: str) -> str:
"""Example of robust error handling."""
try:
# Validate input
if not input_data or len(input_data) > 1000:
return "Error: Invalid input data"
# Process data
result = process_data(input_data)
# Validate output
if not result:
return "Warning: No results generated"
return result
except ValueError as e:
return f"Input error: {str(e)}"
except requests.RequestException as e:
return f"Network error: {str(e)}"
except Exception as e:
return f"Unexpected error: {str(e)}"
3. Documentation
def well_documented_tool(
query: str,
max_results: int = 10,
include_metadata: bool = False
) -> str:
"""
Search for information with comprehensive options.
Args:
query: Search query string (required)
max_results: Maximum number of results (1-100, default: 10)
include_metadata: Include additional metadata in results
Returns:
Formatted search results or error message
Examples:
>>> search("Python programming", max_results=5)
>>> search("machine learning", include_metadata=True)
"""
# Implementation with clear logic
pass
4. Testing Tools
import unittest
from unittest.mock import patch, MagicMock
class TestWeatherTool(unittest.TestCase):
def test_valid_city(self):
result = search_weather("London", "UK")
self.assertIn("Weather in London", result)
def test_invalid_city(self):
result = search_weather("InvalidCity123")
self.assertIn("Error", result)
@patch('requests.get')
def test_api_failure(self, mock_get):
mock_get.side_effect = requests.RequestException("API Error")
result = search_weather("London")
self.assertIn("Error getting weather", result)
# Run tests
if __name__ == '__main__':
unittest.main()
🚀 Next Steps
- Learn about LLM Providers for different AI models
- Explore Enterprise Features for production monitoring
- Check out Advanced Guides for optimization techniques
- See Tool Examples for more complex implementations