Contributing to AutoGPT Agent Server: Creating and Testing Blocks¶
This guide will walk you through the process of creating and testing a new block for the AutoGPT Agent Server, using the WikipediaSummaryBlock as an example.
Understanding Blocks and Testing¶
Blocks are reusable components that can be connected to form a graph representing an agent's behavior. Each block has inputs, outputs, and a specific function. Proper testing is crucial to ensure blocks work correctly and consistently.
Creating and Testing a New Block¶
Follow these steps to create and test a new block:
-
Create a new Python file for your block in the
autogpt_platform/backend/backend/blocks
directory. Name it descriptively and use snake_case. For example:get_wikipedia_summary.py
. -
Import necessary modules and create a class that inherits from
Block
. Make sure to include all necessary imports for your block.Every block should contain the following:
from backend.data.block import Block, BlockSchema, BlockOutput
Example for the Wikipedia summary block:
from backend.data.block import Block, BlockSchema, BlockOutput from backend.utils.get_request import GetRequest import requests class WikipediaSummaryBlock(Block, GetRequest): # Block implementation will go here
-
Define the input and output schemas using
BlockSchema
. These schemas specify the data structure that the block expects to receive (input) and produce (output). -
The input schema defines the structure of the data the block will process. Each field in the schema represents a required piece of input data.
-
The output schema defines the structure of the data the block will return after processing. Each field in the schema represents a piece of output data.
Example:
class Input(BlockSchema): topic: str # The topic to get the Wikipedia summary for class Output(BlockSchema): summary: str # The summary of the topic from Wikipedia error: str # Any error message if the request fails, error field needs to be named `error`.
-
Implement the
__init__
method, including test data and mocks:Important
Use UUID generator (e.g. https://www.uuidgenerator.net/) for every new block
id
and do not make up your own. Alternatively, you can run this python code to generate an uuid:print(__import__('uuid').uuid4())
def __init__(self): super().__init__( # Unique ID for the block, used across users for templates # If you are an AI leave it as is or change to "generate-proper-uuid" id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", input_schema=WikipediaSummaryBlock.Input, # Assign input schema output_schema=WikipediaSummaryBlock.Output, # Assign output schema # Provide sample input, output and test mock for testing the block test_input={"topic": "Artificial Intelligence"}, test_output=("summary", "summary content"), test_mock={"get_request": lambda url, json: {"extract": "summary content"}}, )
-
id
: A unique identifier for the block. -
input_schema
andoutput_schema
: Define the structure of the input and output data.
Let's break down the testing components:
-
test_input
: This is a sample input that will be used to test the block. It should be a valid input according to your Input schema. -
test_output
: This is the expected output when running the block with thetest_input
. It should match your Output schema. For non-deterministic outputs or when you only want to assert the type, you can use Python types instead of specific values. In this example,("summary", str)
asserts that the output key is "summary" and its value is a string. -
test_mock
: This is crucial for blocks that make network calls. It provides a mock function that replaces the actual network call during testing.
In this case, we're mocking the
get_request
method to always return a dictionary with an 'extract' key, simulating a successful API response. This allows us to test the block's logic without making actual network requests, which could be slow, unreliable, or rate-limited. -
-
Implement the
run
method with error handling. This should contain the main logic of the block:
def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
topic = input_data.topic
url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{topic}"
response = self.get_request(url, json=True)
yield "summary", response['extract']
except requests.exceptions.HTTPError as http_err:
raise RuntimeError(f"HTTP error occurred: {http_err}")
- Try block: Contains the main logic to fetch and process the Wikipedia summary.
- API request: Send a GET request to the Wikipedia API.
- Error handling: Handle various exceptions that might occur during the API request and data processing. We don't need to catch all exceptions, only the ones we expect and can handle. The uncaught exceptions will be automatically yielded as
error
in the output. Any block that raises an exception (or yields anerror
output) will be marked as failed. Prefer raising exceptions over yieldingerror
, as it will stop the execution immediately. - Yield: Use
yield
to output the results. Prefer to output one result object at a time. If you are calling a function that returns a list, you can yield each item in the list separately. You can also yield the whole list as well, but do both rather than yielding the list. For example: If you were writing a block that outputs emails, you'd yield each email as a separate result object, but you could also yield the whole list as an additional single result object. Yielding output namederror
will break the execution right away and mark the block execution as failed.
Blocks with authentication¶
Our system supports auth offloading for API keys and OAuth2 authorization flows. Adding a block with API key authentication is straight-forward, as is adding a block for a service that we already have OAuth2 support for.
Implementing the block itself is relatively simple. On top of the instructions above,
you're going to add a credentials
parameter to the Input
model and the run
method:
from backend.data.model import (
APIKeyCredentials,
OAuth2Credentials,
Credentials,
)
from backend.data.block import Block, BlockOutput, BlockSchema
from backend.data.model import CredentialsField
from backend.integrations.providers import ProviderName
# API Key auth:
class BlockWithAPIKeyAuth(Block):
class Input(BlockSchema):
# Note that the type hint below is require or you will get a type error.
# The first argument is the provider name, the second is the credential type.
credentials: CredentialsMetaInput[
Literal[ProviderName.GITHUB], Literal["api_key"]
] = CredentialsField(
description="The GitHub integration can be used with "
"any API key with sufficient permissions for the blocks it is used on.",
)
# ...
def run(
self,
input_data: Input,
*,
credentials: APIKeyCredentials,
**kwargs,
) -> BlockOutput:
...
# OAuth:
class BlockWithOAuth(Block):
class Input(BlockSchema):
# Note that the type hint below is require or you will get a type error.
# The first argument is the provider name, the second is the credential type.
credentials: CredentialsMetaInput[
Literal[ProviderName.GITHUB], Literal["oauth2"]
] = CredentialsField(
required_scopes={"repo"},
description="The GitHub integration can be used with OAuth.",
)
# ...
def run(
self,
input_data: Input,
*,
credentials: OAuth2Credentials,
**kwargs,
) -> BlockOutput:
...
# API Key auth + OAuth:
class BlockWithAPIKeyAndOAuth(Block):
class Input(BlockSchema):
# Note that the type hint below is require or you will get a type error.
# The first argument is the provider name, the second is the credential type.
credentials: CredentialsMetaInput[
Literal[ProviderName.GITHUB], Literal["api_key", "oauth2"]
] = CredentialsField(
required_scopes={"repo"},
description="The GitHub integration can be used with OAuth, "
"or any API key with sufficient permissions for the blocks it is used on.",
)
# ...
def run(
self,
input_data: Input,
*,
credentials: Credentials,
**kwargs,
) -> BlockOutput:
...
The credentials will be automagically injected by the executor in the back end.
The APIKeyCredentials
and OAuth2Credentials
models are defined here.
To use them in e.g. an API request, you can either access the token directly:
# credentials: APIKeyCredentials
response = requests.post(
url,
headers={
"Authorization": f"Bearer {credentials.api_key.get_secret_value()})",
},
)
# credentials: OAuth2Credentials
response = requests.post(
url,
headers={
"Authorization": f"Bearer {credentials.access_token.get_secret_value()})",
},
)
or use the shortcut credentials.bearer()
:
# credentials: APIKeyCredentials | OAuth2Credentials
response = requests.post(
url,
headers={"Authorization": credentials.bearer()},
)
The ProviderName
enum is the single source of truth for which providers exist in our system.
Naturally, to add an authenticated block for a new provider, you'll have to add it here too.
ProviderName
definition
class ProviderName(str, Enum):
ANTHROPIC = "anthropic"
DISCORD = "discord"
D_ID = "d_id"
E2B = "e2b"
EXA = "exa"
FAL = "fal"
GITHUB = "github"
GOOGLE = "google"
GOOGLE_MAPS = "google_maps"
GROQ = "groq"
HUBSPOT = "hubspot"
IDEOGRAM = "ideogram"
JINA = "jina"
MEDIUM = "medium"
NOTION = "notion"
OLLAMA = "ollama"
OPENAI = "openai"
OPENWEATHERMAP = "openweathermap"
OPEN_ROUTER = "open_router"
PINECONE = "pinecone"
REPLICATE = "replicate"
REVID = "revid"
SLANT3D = "slant3d"
UNREAL_SPEECH = "unreal_speech"
Adding an OAuth2 service integration¶
To add support for a new OAuth2-authenticated service, you'll need to add an OAuthHandler
.
All our existing handlers and the base class can be found here.
Every handler must implement the following parts of the [BaseOAuthHandler
] interface:
PROVIDER_NAME: ClassVar[ProviderName]
DEFAULT_SCOPES: ClassVar[list[str]] = []
def __init__(self, client_id: str, client_secret: str, redirect_uri: str): ...
def get_login_url(self, scopes: list[str], state: str) -> str:
def exchange_code_for_tokens(
self, code: str, scopes: list[str]
) -> OAuth2Credentials:
def _refresh_tokens(self, credentials: OAuth2Credentials) -> OAuth2Credentials:
def revoke_tokens(self, credentials: OAuth2Credentials) -> bool:
As you can see, this is modeled after the standard OAuth2 flow.
Aside from implementing the OAuthHandler
itself, adding a handler into the system requires two more things:
- Adding the handler class to
HANDLERS_BY_NAME
underintegrations/oauth/__init__.py
HANDLERS_BY_NAME: dict["ProviderName", type["BaseOAuthHandler"]] = {
handler.PROVIDER_NAME: handler
for handler in [
GitHubOAuthHandler,
GoogleOAuthHandler,
NotionOAuthHandler,
]
}
- Adding
{provider}_client_id
and{provider}_client_secret
to the application'sSecrets
underutil/settings.py
github_client_id: str = Field(default="", description="GitHub OAuth client ID")
github_client_secret: str = Field(
default="", description="GitHub OAuth client secret"
)
Adding to the frontend¶
You will need to add the provider (api or oauth) to the CredentialsInput
component in frontend/src/components/integrations/credentials-input.tsx
.
export const providerIcons: Record<
CredentialsProviderName,
React.FC<{ className?: string }>
> = {
anthropic: fallbackIcon,
e2b: fallbackIcon,
github: FaGithub,
google: FaGoogle,
groq: fallbackIcon,
notion: NotionLogoIcon,
discord: FaDiscord,
d_id: fallbackIcon,
google_maps: FaGoogle,
jina: fallbackIcon,
ideogram: fallbackIcon,
medium: FaMedium,
ollama: fallbackIcon,
openai: fallbackIcon,
openweathermap: fallbackIcon,
open_router: fallbackIcon,
pinecone: fallbackIcon,
slant3d: fallbackIcon,
replicate: fallbackIcon,
fal: fallbackIcon,
revid: fallbackIcon,
unreal_speech: fallbackIcon,
exa: fallbackIcon,
hubspot: fallbackIcon,
};
You will also need to add the provider to the CredentialsProvider
component in frontend/src/components/integrations/credentials-provider.tsx
.
const providerDisplayNames: Record<CredentialsProviderName, string> = {
anthropic: "Anthropic",
discord: "Discord",
d_id: "D-ID",
e2b: "E2B",
github: "GitHub",
google: "Google",
google_maps: "Google Maps",
groq: "Groq",
ideogram: "Ideogram",
jina: "Jina",
medium: "Medium",
notion: "Notion",
ollama: "Ollama",
openai: "OpenAI",
openweathermap: "OpenWeatherMap",
open_router: "Open Router",
pinecone: "Pinecone",
slant3d: "Slant3D",
replicate: "Replicate",
fal: "FAL",
revid: "Rev.ID",
unreal_speech: "Unreal Speech",
exa: "Exa",
hubspot: "Hubspot",
} as const;
Finally you will need to add the provider to the CredentialsType
enum in frontend/src/lib/autogpt-server-api/types.ts
.
export const PROVIDER_NAMES = {
ANTHROPIC: "anthropic",
D_ID: "d_id",
DISCORD: "discord",
E2B: "e2b",
GITHUB: "github",
GOOGLE: "google",
GOOGLE_MAPS: "google_maps",
GROQ: "groq",
IDEOGRAM: "ideogram",
JINA: "jina",
MEDIUM: "medium",
NOTION: "notion",
OLLAMA: "ollama",
OPENAI: "openai",
OPENWEATHERMAP: "openweathermap",
OPEN_ROUTER: "open_router",
PINECONE: "pinecone",
SLANT3D: "slant3d",
REPLICATE: "replicate",
FAL: "fal",
REVID: "revid",
UNREAL_SPEECH: "unreal_speech",
EXA: "exa",
HUBSPOT: "hubspot",
} as const;
Example: GitHub integration¶
- GitHub blocks with API key + OAuth2 support:
blocks/github
class GithubCommentBlock(Block):
class Input(BlockSchema):
credentials: GithubCredentialsInput = GithubCredentialsField("repo")
issue_url: str = SchemaField(
description="URL of the GitHub issue or pull request",
placeholder="https://github.com/owner/repo/issues/1",
)
comment: str = SchemaField(
description="Comment to post on the issue or pull request",
placeholder="Enter your comment",
)
class Output(BlockSchema):
id: int = SchemaField(description="ID of the created comment")
url: str = SchemaField(description="URL to the comment on GitHub")
error: str = SchemaField(
description="Error message if the comment posting failed"
)
def __init__(self):
super().__init__(
id="a8db4d8d-db1c-4a25-a1b0-416a8c33602b",
description="This block posts a comment on a specified GitHub issue or pull request.",
categories={BlockCategory.DEVELOPER_TOOLS},
input_schema=GithubCommentBlock.Input,
output_schema=GithubCommentBlock.Output,
test_input=[
{
"issue_url": "https://github.com/owner/repo/issues/1",
"comment": "This is a test comment.",
"credentials": TEST_CREDENTIALS_INPUT,
},
{
"issue_url": "https://github.com/owner/repo/pull/1",
"comment": "This is a test comment.",
"credentials": TEST_CREDENTIALS_INPUT,
},
],
test_credentials=TEST_CREDENTIALS,
test_output=[
("id", 1337),
("url", "https://github.com/owner/repo/issues/1#issuecomment-1337"),
("id", 1337),
(
"url",
"https://github.com/owner/repo/issues/1#issuecomment-1337",
),
],
test_mock={
"post_comment": lambda *args, **kwargs: (
1337,
"https://github.com/owner/repo/issues/1#issuecomment-1337",
)
},
)
@staticmethod
def post_comment(
credentials: GithubCredentials, issue_url: str, body_text: str
) -> tuple[int, str]:
api = get_api(credentials)
data = {"body": body_text}
if "pull" in issue_url:
issue_url = issue_url.replace("pull", "issues")
comments_url = issue_url + "/comments"
response = api.post(comments_url, json=data)
comment = response.json()
return comment["id"], comment["html_url"]
def run(
self,
input_data: Input,
*,
credentials: GithubCredentials,
**kwargs,
) -> BlockOutput:
id, url = self.post_comment(
credentials,
input_data.issue_url,
input_data.comment,
)
yield "id", id
yield "url", url
- GitHub OAuth2 handler:
integrations/oauth/github.py
class GitHubOAuthHandler(BaseOAuthHandler):
"""
Based on the documentation at:
- [Authorizing OAuth apps - GitHub Docs](https://docs.github.com/en/apps/oauth-apps/building-oauth-apps/authorizing-oauth-apps)
- [Refreshing user access tokens - GitHub Docs](https://docs.github.com/en/apps/creating-github-apps/authenticating-with-a-github-app/refreshing-user-access-tokens)
Notes:
- By default, token expiration is disabled on GitHub Apps. This means the access
token doesn't expire and no refresh token is returned by the authorization flow.
- When token expiration gets enabled, any existing tokens will remain non-expiring.
- When token expiration gets disabled, token refreshes will return a non-expiring
access token *with no refresh token*.
""" # noqa
PROVIDER_NAME = ProviderName.GITHUB
def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.auth_base_url = "https://github.com/login/oauth/authorize"
self.token_url = "https://github.com/login/oauth/access_token"
self.revoke_url = "https://api.github.com/applications/{client_id}/token"
def get_login_url(self, scopes: list[str], state: str) -> str:
params = {
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"scope": " ".join(scopes),
"state": state,
}
return f"{self.auth_base_url}?{urlencode(params)}"
def exchange_code_for_tokens(
self, code: str, scopes: list[str]
) -> OAuth2Credentials:
return self._request_tokens({"code": code, "redirect_uri": self.redirect_uri})
def revoke_tokens(self, credentials: OAuth2Credentials) -> bool:
if not credentials.access_token:
raise ValueError("No access token to revoke")
headers = {
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
}
requests.delete(
url=self.revoke_url.format(client_id=self.client_id),
auth=(self.client_id, self.client_secret),
headers=headers,
json={"access_token": credentials.access_token.get_secret_value()},
)
return True
def _refresh_tokens(self, credentials: OAuth2Credentials) -> OAuth2Credentials:
if not credentials.refresh_token:
return credentials
return self._request_tokens(
{
"refresh_token": credentials.refresh_token.get_secret_value(),
"grant_type": "refresh_token",
}
)
def _request_tokens(
self,
params: dict[str, str],
current_credentials: Optional[OAuth2Credentials] = None,
) -> OAuth2Credentials:
request_body = {
"client_id": self.client_id,
"client_secret": self.client_secret,
**params,
}
headers = {"Accept": "application/json"}
response = requests.post(self.token_url, data=request_body, headers=headers)
token_data: dict = response.json()
username = self._request_username(token_data["access_token"])
now = int(time.time())
new_credentials = OAuth2Credentials(
provider=self.PROVIDER_NAME,
title=current_credentials.title if current_credentials else None,
username=username,
access_token=token_data["access_token"],
# Token refresh responses have an empty `scope` property (see docs),
# so we have to get the scope from the existing credentials object.
scopes=(
token_data.get("scope", "").split(",")
or (current_credentials.scopes if current_credentials else [])
),
# Refresh token and expiration intervals are only given if token expiration
# is enabled in the GitHub App's settings.
refresh_token=token_data.get("refresh_token"),
access_token_expires_at=(
now + expires_in
if (expires_in := token_data.get("expires_in", None))
else None
),
refresh_token_expires_at=(
now + expires_in
if (expires_in := token_data.get("refresh_token_expires_in", None))
else None
),
)
if current_credentials:
new_credentials.id = current_credentials.id
return new_credentials
def _request_username(self, access_token: str) -> str | None:
url = "https://api.github.com/user"
headers = {
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {access_token}",
"X-GitHub-Api-Version": "2022-11-28",
}
response = requests.get(url, headers=headers)
if not response.ok:
return None
# Get the login (username)
return response.json().get("login")
Example: Google integration¶
- Google OAuth2 handler:
integrations/oauth/google.py
class GoogleOAuthHandler(BaseOAuthHandler):
"""
Based on the documentation at https://developers.google.com/identity/protocols/oauth2/web-server
""" # noqa
PROVIDER_NAME = ProviderName.GOOGLE
EMAIL_ENDPOINT = "https://www.googleapis.com/oauth2/v2/userinfo"
DEFAULT_SCOPES = [
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/userinfo.profile",
"openid",
]
You can see that google has defined a DEFAULT_SCOPES
variable, this is used to set the scopes that are requested no matter what the user asks for.
secrets = Secrets()
GOOGLE_OAUTH_IS_CONFIGURED = bool(
secrets.google_client_id and secrets.google_client_secret
)
You can also see that GOOGLE_OAUTH_IS_CONFIGURED
is used to disable the blocks that require OAuth if the oauth is not configured. This is in the __init__
method of each block. This is because there is no api key fallback for google blocks so we need to make sure that the oauth is configured before we allow the user to use the blocks.
Webhook-triggered Blocks¶
Webhook-triggered blocks allow your agent to respond to external events in real-time. These blocks are triggered by incoming webhooks from third-party services rather than being executed manually.
Creating and running a webhook-triggered block involves three main components:
- The block itself, which specifies:
- Inputs for the user to select a resource and events to subscribe to
- A
credentials
input with the scopes needed to manage webhooks - Logic to turn the webhook payload into outputs for the webhook block
- The
WebhooksManager
for the corresponding webhook service provider, which handles:- (De)registering webhooks with the provider
- Parsing and validating incoming webhook payloads
- The credentials system for the corresponding service provider, which may include an
OAuthHandler
There is more going on under the hood, e.g. to store and retrieve webhooks and their links to nodes, but to add a webhook-triggered block you shouldn't need to make changes to those parts of the system.
Creating a Webhook-triggered Block¶
To create a webhook-triggered block, follow these additional steps on top of the basic block creation process:
-
Define
webhook_config
in your block's__init__
method.Example:
GitHubPullRequestTriggerBlock
backend/blocks/github/triggers.pywebhook_config=BlockWebhookConfig( provider="github", webhook_type=GithubWebhookType.REPO, resource_format="{repo}", event_filter_input="events", event_format="pull_request.{event}", ),
BlockWebhookConfig
definitionbackend/data/block.pyclass BlockWebhookConfig(BaseModel): provider: str """The service provider that the webhook connects to""" webhook_type: str """ Identifier for the webhook type. E.g. GitHub has repo and organization level hooks. Only for use in the corresponding `WebhooksManager`. """ resource_format: str """ Template string for the resource that a block instance subscribes to. Fields will be filled from the block's inputs (except `payload`). Example: `f"{repo}/pull_requests"` (note: not how it's actually implemented) Only for use in the corresponding `WebhooksManager`. """ event_filter_input: str """Name of the block's event filter input.""" event_format: str = "{event}" """ Template string for the event(s) that a block instance subscribes to. Applied individually to each event selected in the event filter input. Example: `"pull_request.{event}"` -> `"pull_request.opened"` """
-
Define event filter input in your block's Input schema. This allows the user to select which specific types of events will trigger the block in their agent.
Example:
GitHubPullRequestTriggerBlock
backend/blocks/github/triggers.pyclass Input(GitHubTriggerBase.Input): class EventsFilter(BaseModel): """ https://docs.github.com/en/webhooks/webhook-events-and-payloads#pull_request """ opened: bool = False edited: bool = False closed: bool = False reopened: bool = False synchronize: bool = False assigned: bool = False unassigned: bool = False labeled: bool = False unlabeled: bool = False converted_to_draft: bool = False locked: bool = False unlocked: bool = False enqueued: bool = False dequeued: bool = False milestoned: bool = False demilestoned: bool = False ready_for_review: bool = False review_requested: bool = False review_request_removed: bool = False auto_merge_enabled: bool = False auto_merge_disabled: bool = False events: EventsFilter = SchemaField( title="Events", description="The events to subscribe to" )
- The name of the input field (
events
in this case) must matchwebhook_config.event_filter_input
. - The event filter itself must be a Pydantic model with only boolean fields.
- The name of the input field (
-
Include payload field in your block's Input schema.
Example:
GitHubTriggerBase
backend/blocks/github/triggers.pypayload: dict = SchemaField(hidden=True, default={})
-
Define
credentials
input in your block's Input schema.- Its scopes must be sufficient to manage a user's webhooks through the provider's API
- See Blocks with authentication for further details
-
Process webhook payload and output relevant parts of it in your block's
run
method.Example:
GitHubPullRequestTriggerBlock
def run(self, input_data: Input, **kwargs) -> BlockOutput: yield "payload", input_data.payload yield "sender", input_data.payload["sender"] yield "event", input_data.payload["action"] yield "number", input_data.payload["number"] yield "pull_request", input_data.payload["pull_request"]
Note that the
credentials
parameter can be omitted if the credentials aren't used at block runtime, like in the example.
Adding a Webhooks Manager¶
To add support for a new webhook provider, you'll need to create a WebhooksManager that implements the BaseWebhooksManager
interface:
PROVIDER_NAME: ClassVar[ProviderName]
@abstractmethod
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: WT,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> tuple[str, dict]:
"""
Registers a new webhook with the provider.
Params:
credentials: The credentials with which to create the webhook
webhook_type: The provider-specific webhook type to create
resource: The resource to receive events for
events: The events to subscribe to
ingress_url: The ingress URL for webhook payloads
secret: Secret used to verify webhook payloads
Returns:
str: Webhook ID assigned by the provider
config: Provider-specific configuration for the webhook
"""
...
@classmethod
@abstractmethod
async def validate_payload(
cls, webhook: integrations.Webhook, request: Request
) -> tuple[dict, str]:
"""
Validates an incoming webhook request and returns its payload and type.
Params:
webhook: Object representing the configured webhook and its properties in our system.
request: Incoming FastAPI `Request`
Returns:
dict: The validated payload
str: The event type associated with the payload
"""
@abstractmethod
async def _deregister_webhook(
self, webhook: integrations.Webhook, credentials: Credentials
) -> None: ...
async def trigger_ping(
self, webhook: integrations.Webhook, credentials: Credentials | None
) -> None:
"""
Triggers a ping to the given webhook.
Raises:
NotImplementedError: if the provider doesn't support pinging
"""
And add a reference to your WebhooksManager
class in WEBHOOK_MANAGERS_BY_NAME
:
WEBHOOK_MANAGERS_BY_NAME: dict["ProviderName", type["BaseWebhooksManager"]] = {
handler.PROVIDER_NAME: handler
for handler in [
GithubWebhooksManager,
Slant3DWebhooksManager,
]
}
Example: GitHub Webhook Integration¶
GitHub Webhook triggers: blocks/github/triggers.py
class GitHubTriggerBase:
class Input(BlockSchema):
credentials: GithubCredentialsInput = GithubCredentialsField("repo")
repo: str = SchemaField(
description=(
"Repository to subscribe to.\n\n"
"**Note:** Make sure your GitHub credentials have permissions "
"to create webhooks on this repo."
),
placeholder="{owner}/{repo}",
)
payload: dict = SchemaField(hidden=True, default={})
class Output(BlockSchema):
payload: dict = SchemaField(
description="The complete webhook payload that was received from GitHub. "
"Includes information about the affected resource (e.g. pull request), "
"the event, and the user who triggered the event."
)
triggered_by_user: dict = SchemaField(
description="Object representing the GitHub user who triggered the event"
)
error: str = SchemaField(
description="Error message if the payload could not be processed"
)
def run(self, input_data: Input, **kwargs) -> BlockOutput:
yield "payload", input_data.payload
yield "triggered_by_user", input_data.payload["sender"]
class GithubPullRequestTriggerBlock(GitHubTriggerBase, Block):
EXAMPLE_PAYLOAD_FILE = (
Path(__file__).parent / "example_payloads" / "pull_request.synchronize.json"
)
class Input(GitHubTriggerBase.Input):
class EventsFilter(BaseModel):
"""
https://docs.github.com/en/webhooks/webhook-events-and-payloads#pull_request
"""
opened: bool = False
edited: bool = False
closed: bool = False
reopened: bool = False
synchronize: bool = False
assigned: bool = False
unassigned: bool = False
labeled: bool = False
unlabeled: bool = False
converted_to_draft: bool = False
locked: bool = False
unlocked: bool = False
enqueued: bool = False
dequeued: bool = False
milestoned: bool = False
demilestoned: bool = False
ready_for_review: bool = False
review_requested: bool = False
review_request_removed: bool = False
auto_merge_enabled: bool = False
auto_merge_disabled: bool = False
events: EventsFilter = SchemaField(
title="Events", description="The events to subscribe to"
)
class Output(GitHubTriggerBase.Output):
event: str = SchemaField(
description="The PR event that triggered the webhook (e.g. 'opened')"
)
number: int = SchemaField(description="The number of the affected pull request")
pull_request: dict = SchemaField(
description="Object representing the affected pull request"
)
pull_request_url: str = SchemaField(
description="The URL of the affected pull request"
)
def __init__(self):
from backend.integrations.webhooks.github import GithubWebhookType
example_payload = json.loads(
self.EXAMPLE_PAYLOAD_FILE.read_text(encoding="utf-8")
)
super().__init__(
id="6c60ec01-8128-419e-988f-96a063ee2fea",
description="This block triggers on pull request events and outputs the event type and payload.",
categories={BlockCategory.DEVELOPER_TOOLS, BlockCategory.INPUT},
input_schema=GithubPullRequestTriggerBlock.Input,
output_schema=GithubPullRequestTriggerBlock.Output,
webhook_config=BlockWebhookConfig(
provider="github",
webhook_type=GithubWebhookType.REPO,
resource_format="{repo}",
event_filter_input="events",
event_format="pull_request.{event}",
),
test_input={
"repo": "Significant-Gravitas/AutoGPT",
"events": {"opened": True, "synchronize": True},
"credentials": TEST_CREDENTIALS_INPUT,
"payload": example_payload,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("payload", example_payload),
("triggered_by_user", example_payload["sender"]),
("event", example_payload["action"]),
("number", example_payload["number"]),
("pull_request", example_payload["pull_request"]),
("pull_request_url", example_payload["pull_request"]["html_url"]),
],
)
def run(self, input_data: Input, **kwargs) -> BlockOutput: # type: ignore
yield from super().run(input_data, **kwargs)
yield "event", input_data.payload["action"]
yield "number", input_data.payload["number"]
yield "pull_request", input_data.payload["pull_request"]
yield "pull_request_url", input_data.payload["pull_request"]["html_url"]
GitHub Webhooks Manager: integrations/webhooks/github.py
class GithubWebhookType(StrEnum):
REPO = "repo"
class GithubWebhooksManager(BaseWebhooksManager):
PROVIDER_NAME = ProviderName.GITHUB
WebhookType = GithubWebhookType
GITHUB_API_URL = "https://api.github.com"
GITHUB_API_DEFAULT_HEADERS = {"Accept": "application/vnd.github.v3+json"}
@classmethod
async def validate_payload(
cls, webhook: integrations.Webhook, request: Request
) -> tuple[dict, str]:
if not (event_type := request.headers.get("X-GitHub-Event")):
raise HTTPException(
status_code=400, detail="X-GitHub-Event header is missing!"
)
if not (signature_header := request.headers.get("X-Hub-Signature-256")):
raise HTTPException(
status_code=403, detail="X-Hub-Signature-256 header is missing!"
)
payload_body = await request.body()
hash_object = hmac.new(
webhook.secret.encode("utf-8"), msg=payload_body, digestmod=hashlib.sha256
)
expected_signature = "sha256=" + hash_object.hexdigest()
if not hmac.compare_digest(expected_signature, signature_header):
raise HTTPException(
status_code=403, detail="Request signatures didn't match!"
)
payload = await request.json()
if action := payload.get("action"):
event_type += f".{action}"
return payload, event_type
async def trigger_ping(
self, webhook: integrations.Webhook, credentials: Credentials | None
) -> None:
if not credentials:
raise ValueError("Credentials are required but were not passed")
headers = {
**self.GITHUB_API_DEFAULT_HEADERS,
"Authorization": credentials.bearer(),
}
repo, github_hook_id = webhook.resource, webhook.provider_webhook_id
ping_url = f"{self.GITHUB_API_URL}/repos/{repo}/hooks/{github_hook_id}/pings"
response = requests.post(ping_url, headers=headers)
if response.status_code != 204:
error_msg = extract_github_error_msg(response)
raise ValueError(f"Failed to ping GitHub webhook: {error_msg}")
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: GithubWebhookType,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> tuple[str, dict]:
if webhook_type == self.WebhookType.REPO and resource.count("/") > 1:
raise ValueError("Invalid repo format: expected 'owner/repo'")
# Extract main event, e.g. `pull_request.opened` -> `pull_request`
github_events = list({event.split(".")[0] for event in events})
headers = {
**self.GITHUB_API_DEFAULT_HEADERS,
"Authorization": credentials.bearer(),
}
webhook_data = {
"name": "web",
"active": True,
"events": github_events,
"config": {
"url": ingress_url,
"content_type": "json",
"insecure_ssl": "0",
"secret": secret,
},
}
response = requests.post(
f"{self.GITHUB_API_URL}/repos/{resource}/hooks",
headers=headers,
json=webhook_data,
)
if response.status_code != 201:
error_msg = extract_github_error_msg(response)
if "not found" in error_msg.lower():
error_msg = (
f"{error_msg} "
"(Make sure the GitHub account or API key has 'repo' or "
f"webhook create permissions to '{resource}')"
)
raise ValueError(f"Failed to create GitHub webhook: {error_msg}")
webhook_id = response.json()["id"]
config = response.json()["config"]
return str(webhook_id), config
async def _deregister_webhook(
self, webhook: integrations.Webhook, credentials: Credentials
) -> None:
webhook_type = self.WebhookType(webhook.webhook_type)
if webhook.credentials_id != credentials.id:
raise ValueError(
f"Webhook #{webhook.id} does not belong to credentials {credentials.id}"
)
headers = {
**self.GITHUB_API_DEFAULT_HEADERS,
"Authorization": credentials.bearer(),
}
if webhook_type == self.WebhookType.REPO:
repo = webhook.resource
delete_url = f"{self.GITHUB_API_URL}/repos/{repo}/hooks/{webhook.provider_webhook_id}" # noqa
else:
raise NotImplementedError(
f"Unsupported webhook type '{webhook.webhook_type}'"
)
response = requests.delete(delete_url, headers=headers)
if response.status_code not in [204, 404]:
# 204 means successful deletion, 404 means the webhook was already deleted
error_msg = extract_github_error_msg(response)
raise ValueError(f"Failed to delete GitHub webhook: {error_msg}")
# If we reach here, the webhook was successfully deleted or didn't exist
Key Points to Remember¶
- Unique ID: Give your block a unique ID in the init method.
- Input and Output Schemas: Define clear input and output schemas.
- Error Handling: Implement error handling in the
run
method. - Output Results: Use
yield
to output results in therun
method. - Testing: Provide test input and output in the init method for automatic testing.
Understanding the Testing Process¶
The testing of blocks is handled by test_block.py
, which does the following:
- It calls the block with the provided
test_input
.
If the block has acredentials
field,test_credentials
is passed in as well. - If a
test_mock
is provided, it temporarily replaces the specified methods with the mock functions. - It then asserts that the output matches the
test_output
.
For the WikipediaSummaryBlock:
- The test will call the block with the topic "Artificial Intelligence".
- Instead of making a real API call, it will use the mock function, which returns
{"extract": "summary content"}
. - It will then check if the output key is "summary" and its value is a string.
This approach allows us to test the block's logic comprehensively without relying on external services, while also accommodating non-deterministic outputs.
Security Best Practices for SSRF Prevention¶
When creating blocks that handle external URL inputs or make network requests, it's crucial to use the platform's built-in SSRF protection mechanisms. The backend.util.request
module provides a secure Requests
wrapper class that should be used for all HTTP requests.
Using the Secure Requests Wrapper¶
from backend.util.request import requests
class MyNetworkBlock(Block):
def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
# The requests wrapper automatically validates URLs and blocks dangerous requests
response = requests.get(input_data.url)
yield "result", response.text
except ValueError as e:
# URL validation failed
raise RuntimeError(f"Invalid URL provided: {e}")
except requests.exceptions.RequestException as e:
# Request failed
raise RuntimeError(f"Request failed: {e}")
The Requests
wrapper provides these security features:
-
URL Validation:
- Blocks requests to private IP ranges (RFC 1918)
- Validates URL format and protocol
- Resolves DNS and checks IP addresses
- Supports whitelisting trusted origins
-
Secure Defaults:
- Disables redirects by default
- Raises exceptions for non-200 status codes
- Supports custom headers and validators
-
Protected IP Ranges: The wrapper denies requests to these networks:
backend/util/request.pyipaddress.ip_network("0.0.0.0/8"), # "This" Network ipaddress.ip_network("10.0.0.0/8"), # Private-Use ipaddress.ip_network("127.0.0.0/8"), # Loopback ipaddress.ip_network("169.254.0.0/16"), # Link Local ipaddress.ip_network("172.16.0.0/12"), # Private-Use ipaddress.ip_network("192.168.0.0/16"), # Private-Use ipaddress.ip_network("224.0.0.0/4"), # Multicast ipaddress.ip_network("240.0.0.0/4"), # Reserved for Future Use
Custom Request Configuration¶
If you need to customize the request behavior:
from backend.util.request import Requests
# Create a custom requests instance with specific trusted origins
custom_requests = Requests(
trusted_origins=["api.trusted-service.com"],
raise_for_status=True,
extra_headers={"User-Agent": "MyBlock/1.0"}
)
Tips for Effective Block Testing¶
-
Provide realistic test_input: Ensure your test input covers typical use cases.
-
Define appropriate test_output:
- For deterministic outputs, use specific expected values.
- For non-deterministic outputs or when only the type matters, use Python types (e.g.,
str
,int
,dict
). - You can mix specific values and types, e.g.,
("key1", str), ("key2", 42)
.
-
Use test_mock for network calls: This prevents tests from failing due to network issues or API changes.
-
Consider omitting test_mock for blocks without external dependencies: If your block doesn't make network calls or use external resources, you might not need a mock.
-
Consider edge cases: Include tests for potential error conditions in your
run
method. -
Update tests when changing block behavior: If you modify your block, ensure the tests are updated accordingly.
By following these steps, you can create new blocks that extend the functionality of the AutoGPT Agent Server.
Blocks we want to see¶
Below is a list of blocks that we would like to see implemented in the AutoGPT Agent Server. If you're interested in contributing, feel free to pick one of these blocks or chose your own.
If you would like to implement one of these blocks, open a pull request and we will start the review process.
Consumer Services/Platforms¶
- Google sheets -
Read/Append - Email - Read/Send with
Gmail, Outlook, Yahoo, Proton, etc - Calendar - Read/Write with Google Calendar, Outlook Calendar, etc
- Home Assistant - Call Service, Get Status
- Dominos - Order Pizza, Track Order
- Uber - Book Ride, Track Ride
- Notion - Create/Read Page, Create/Append/Read DB
- Google drive - read/write/overwrite file/folder
Social Media¶
- Twitter - Post, Reply, Get Replies, Get Comments, Get Followers, Get Following, Get Tweets, Get Mentions
- Instagram - Post, Reply, Get Comments, Get Followers, Get Following, Get Posts, Get Mentions, Get Trending Posts
- TikTok - Post, Reply, Get Comments, Get Followers, Get Following, Get Videos, Get Mentions, Get Trending Videos
- LinkedIn - Post, Reply, Get Comments, Get Followers, Get Following, Get Posts, Get Mentions, Get Trending Posts
- YouTube - Transcribe Videos/Shorts, Post Videos/Shorts, Read/Reply/React to Comments, Update Thumbnails, Update Description, Update Tags, Update Titles, Get Views, Get Likes, Get Dislikes, Get Subscribers, Get Comments, Get Shares, Get Watch Time, Get Revenue, Get Trending Videos, Get Top Videos, Get Top Channels
- Reddit - Post, Reply, Get Comments, Get Followers, Get Following, Get Posts, Get Mentions, Get Trending Posts
- Treatwell (and related Platforms) - Book, Cancel, Review, Get Recommendations
- Substack - Read/Subscribe/Unsubscribe, Post/Reply, Get Recommendations
- Discord - Read/Post/Reply, Moderation actions
- GoodReads - Read/Post/Reply, Get Recommendations
E-commerce¶
- Airbnb - Book, Cancel, Review, Get Recommendations
- Amazon - Order, Track Order, Return, Review, Get Recommendations
- eBay - Order, Track Order, Return, Review, Get Recommendations
- Upwork - Post Jobs, Hire Freelancer, Review Freelancer, Fire Freelancer
Business Tools¶
- External Agents - Call other agents similar to AutoGPT
- Trello - Create/Read/Update/Delete Cards, Lists, Boards
- Jira - Create/Read/Update/Delete Issues, Projects, Boards
- Linear - Create/Read/Update/Delete Issues, Projects, Boards
- Excel - Read/Write/Update/Delete Rows, Columns, Sheets
- Slack - Read/Post/Reply to Messages, Create Channels, Invite Users
- ERPNext - Create/Read/Update/Delete Invoices, Orders, Customers, Products
- Salesforce - Create/Read/Update/Delete Leads, Opportunities, Accounts
- HubSpot - Create/Read/Update/Delete Contacts, Deals, Companies
- Zendesk - Create/Read/Update/Delete Tickets, Users, Organizations
- Odoo - Create/Read/Update/Delete Sales Orders, Invoices, Customers
- Shopify - Create/Read/Update/Delete Products, Orders, Customers
- WooCommerce - Create/Read/Update/Delete Products, Orders, Customers
- Squarespace - Create/Read/Update/Delete Pages, Products, Orders
Agent Templates we want to see¶
Data/Information¶
- Summarize top news of today, of this week, this month via Apple News or other large media outlets BBC, TechCrunch, hackernews, etc
- Create, read, and summarize substack newsletters or any newsletters (blog writer vs blog reader)
- Get/read/summarize the most viral Twitter, Instagram, TikTok (general social media accounts) of the day, week, month
- Get/Read any LinkedIn posts or profile that mention AI Agents
- Read/Summarize discord (might not be able to do this because you need access)
- Read / Get most read books in a given month, year, etc from GoodReads or Amazon Books, etc
- Get dates for specific shows across all streaming services
- Suggest/Recommend/Get most watched shows in a given month, year, etc across all streaming platforms
- Data analysis from xlsx data set
- Gather via Excel or Google Sheets data > Sample the data randomly (sample block takes top X, bottom X, randomly, etc) > pass that to LLM Block to generate a script for analysis of the full data > Python block to run the script> making a loop back through LLM Fix Block on error > create chart/visualization (potentially in the code block?) > show the image as output (this may require frontend changes to show)
- Tiktok video search and download
Marketing¶
- Portfolio site design and enhancements