Skip to content

Contributing to AutoGPT Agent Server: Creating and Testing Blocks

This guide will walk you through the process of creating and testing a new block for the AutoGPT Agent Server, using the WikipediaSummaryBlock as an example.

Understanding Blocks and Testing

Blocks are reusable components that can be connected to form a graph representing an agent's behavior. Each block has inputs, outputs, and a specific function. Proper testing is crucial to ensure blocks work correctly and consistently.

Creating and Testing a New Block

Follow these steps to create and test a new block:

  1. Create a new Python file for your block in the autogpt_platform/backend/backend/blocks directory. Name it descriptively and use snake_case. For example: get_wikipedia_summary.py.

  2. Import necessary modules and create a class that inherits from Block. Make sure to include all necessary imports for your block.

    Every block should contain the following:

    from backend.data.block import Block, BlockSchema, BlockOutput
    

    Example for the Wikipedia summary block:

    from backend.data.block import Block, BlockSchema, BlockOutput
    from backend.utils.get_request import GetRequest
    import requests
    
    class WikipediaSummaryBlock(Block, GetRequest):
        # Block implementation will go here
    
  3. Define the input and output schemas using BlockSchema. These schemas specify the data structure that the block expects to receive (input) and produce (output).

  4. The input schema defines the structure of the data the block will process. Each field in the schema represents a required piece of input data.

  5. The output schema defines the structure of the data the block will return after processing. Each field in the schema represents a piece of output data.

    Example:

    class Input(BlockSchema):
        topic: str  # The topic to get the Wikipedia summary for
    
    class Output(BlockSchema):
        summary: str  # The summary of the topic from Wikipedia
        error: str  # Any error message if the request fails, error field needs to be named `error`.
    
  6. Implement the __init__ method, including test data and mocks:

    Important

    Use UUID generator (e.g. https://www.uuidgenerator.net/) for every new block id and do not make up your own. Alternatively, you can run this python code to generate an uuid: print(__import__('uuid').uuid4())

    def __init__(self):
        super().__init__(
            # Unique ID for the block, used across users for templates
            # If you are an AI leave it as is or change to "generate-proper-uuid"
            id="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
            input_schema=WikipediaSummaryBlock.Input,  # Assign input schema
            output_schema=WikipediaSummaryBlock.Output,  # Assign output schema
    
                # Provide sample input, output and test mock for testing the block
    
            test_input={"topic": "Artificial Intelligence"},
            test_output=("summary", "summary content"),
            test_mock={"get_request": lambda url, json: {"extract": "summary content"}},
        )
    
    • id: A unique identifier for the block.

    • input_schema and output_schema: Define the structure of the input and output data.

    Let's break down the testing components:

    • test_input: This is a sample input that will be used to test the block. It should be a valid input according to your Input schema.

    • test_output: This is the expected output when running the block with the test_input. It should match your Output schema. For non-deterministic outputs or when you only want to assert the type, you can use Python types instead of specific values. In this example, ("summary", str) asserts that the output key is "summary" and its value is a string.

    • test_mock: This is crucial for blocks that make network calls. It provides a mock function that replaces the actual network call during testing.

    In this case, we're mocking the get_request method to always return a dictionary with an 'extract' key, simulating a successful API response. This allows us to test the block's logic without making actual network requests, which could be slow, unreliable, or rate-limited.

  7. Implement the run method with error handling. This should contain the main logic of the block:

def run(self, input_data: Input, **kwargs) -> BlockOutput:
    try:
        topic = input_data.topic
        url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{topic}"

        response = self.get_request(url, json=True)
        yield "summary", response['extract']

    except requests.exceptions.HTTPError as http_err:
        raise RuntimeError(f"HTTP error occurred: {http_err}")
  • Try block: Contains the main logic to fetch and process the Wikipedia summary.
  • API request: Send a GET request to the Wikipedia API.
  • Error handling: Handle various exceptions that might occur during the API request and data processing. We don't need to catch all exceptions, only the ones we expect and can handle. The uncaught exceptions will be automatically yielded as error in the output. Any block that raises an exception (or yields an error output) will be marked as failed. Prefer raising exceptions over yielding error, as it will stop the execution immediately.
  • Yield: Use yield to output the results. Prefer to output one result object at a time. If you are calling a function that returns a list, you can yield each item in the list separately. You can also yield the whole list as well, but do both rather than yielding the list. For example: If you were writing a block that outputs emails, you'd yield each email as a separate result object, but you could also yield the whole list as an additional single result object. Yielding output named error will break the execution right away and mark the block execution as failed.

Blocks with authentication

Our system supports auth offloading for API keys and OAuth2 authorization flows. Adding a block with API key authentication is straight-forward, as is adding a block for a service that we already have OAuth2 support for.

Implementing the block itself is relatively simple. On top of the instructions above, you're going to add a credentials parameter to the Input model and the run method:

from backend.data.model import (
    APIKeyCredentials,
    OAuth2Credentials,
    Credentials,
)

from backend.data.block import Block, BlockOutput, BlockSchema
from backend.data.model import CredentialsField
from backend.integrations.providers import ProviderName


# API Key auth:
class BlockWithAPIKeyAuth(Block):
    class Input(BlockSchema):
        # Note that the type hint below is require or you will get a type error.
        # The first argument is the provider name, the second is the credential type.
        credentials: CredentialsMetaInput[
            Literal[ProviderName.GITHUB], Literal["api_key"]
        ] = CredentialsField(
            description="The GitHub integration can be used with "
            "any API key with sufficient permissions for the blocks it is used on.",
        )

    # ...

    def run(
        self,
        input_data: Input,
        *,
        credentials: APIKeyCredentials,
        **kwargs,
    ) -> BlockOutput:
        ...

# OAuth:
class BlockWithOAuth(Block):
    class Input(BlockSchema):
        # Note that the type hint below is require or you will get a type error.
        # The first argument is the provider name, the second is the credential type.
        credentials: CredentialsMetaInput[
            Literal[ProviderName.GITHUB], Literal["oauth2"]
        ] = CredentialsField(
            required_scopes={"repo"},
            description="The GitHub integration can be used with OAuth.",
        )

    # ...

    def run(
        self,
        input_data: Input,
        *,
        credentials: OAuth2Credentials,
        **kwargs,
    ) -> BlockOutput:
        ...

# API Key auth + OAuth:
class BlockWithAPIKeyAndOAuth(Block):
    class Input(BlockSchema):
        # Note that the type hint below is require or you will get a type error.
        # The first argument is the provider name, the second is the credential type.
        credentials: CredentialsMetaInput[
            Literal[ProviderName.GITHUB], Literal["api_key", "oauth2"]
        ] = CredentialsField(
            required_scopes={"repo"},
            description="The GitHub integration can be used with OAuth, "
            "or any API key with sufficient permissions for the blocks it is used on.",
        )

    # ...

    def run(
        self,
        input_data: Input,
        *,
        credentials: Credentials,
        **kwargs,
    ) -> BlockOutput:
        ...

The credentials will be automagically injected by the executor in the back end.

The APIKeyCredentials and OAuth2Credentials models are defined here. To use them in e.g. an API request, you can either access the token directly:

# credentials: APIKeyCredentials
response = requests.post(
    url,
    headers={
        "Authorization": f"Bearer {credentials.api_key.get_secret_value()})",
    },
)

# credentials: OAuth2Credentials
response = requests.post(
    url,
    headers={
        "Authorization": f"Bearer {credentials.access_token.get_secret_value()})",
    },
)

or use the shortcut credentials.bearer():

# credentials: APIKeyCredentials | OAuth2Credentials
response = requests.post(
    url,
    headers={"Authorization": credentials.bearer()},
)

The ProviderName enum is the single source of truth for which providers exist in our system. Naturally, to add an authenticated block for a new provider, you'll have to add it here too.

ProviderName definition
backend/integrations/providers.py
class ProviderName(str, Enum):
    ANTHROPIC = "anthropic"
    DISCORD = "discord"
    D_ID = "d_id"
    E2B = "e2b"
    EXA = "exa"
    FAL = "fal"
    GITHUB = "github"
    GOOGLE = "google"
    GOOGLE_MAPS = "google_maps"
    GROQ = "groq"
    HUBSPOT = "hubspot"
    IDEOGRAM = "ideogram"
    JINA = "jina"
    MEDIUM = "medium"
    NOTION = "notion"
    OLLAMA = "ollama"
    OPENAI = "openai"
    OPENWEATHERMAP = "openweathermap"
    OPEN_ROUTER = "open_router"
    PINECONE = "pinecone"
    REPLICATE = "replicate"
    REVID = "revid"
    SLANT3D = "slant3d"
    UNREAL_SPEECH = "unreal_speech"

Adding an OAuth2 service integration

To add support for a new OAuth2-authenticated service, you'll need to add an OAuthHandler. All our existing handlers and the base class can be found here.

Every handler must implement the following parts of the [BaseOAuthHandler] interface:

backend/integrations/oauth/base.py
PROVIDER_NAME: ClassVar[ProviderName]
DEFAULT_SCOPES: ClassVar[list[str]] = []
def __init__(self, client_id: str, client_secret: str, redirect_uri: str): ...

def get_login_url(self, scopes: list[str], state: str) -> str:
def exchange_code_for_tokens(
    self, code: str, scopes: list[str]
) -> OAuth2Credentials:
def _refresh_tokens(self, credentials: OAuth2Credentials) -> OAuth2Credentials:
def revoke_tokens(self, credentials: OAuth2Credentials) -> bool:

As you can see, this is modeled after the standard OAuth2 flow.

Aside from implementing the OAuthHandler itself, adding a handler into the system requires two more things:

backend/integrations/oauth/__init__.py
HANDLERS_BY_NAME: dict["ProviderName", type["BaseOAuthHandler"]] = {
    handler.PROVIDER_NAME: handler
    for handler in [
        GitHubOAuthHandler,
        GoogleOAuthHandler,
        NotionOAuthHandler,
    ]
}
  • Adding {provider}_client_id and {provider}_client_secret to the application's Secrets under util/settings.py
backend/util/settings.py
github_client_id: str = Field(default="", description="GitHub OAuth client ID")
github_client_secret: str = Field(
    default="", description="GitHub OAuth client secret"
)

Adding to the frontend

You will need to add the provider (api or oauth) to the CredentialsInput component in frontend/src/components/integrations/credentials-input.tsx.

frontend/src/components/integrations/credentials-input.tsx
export const providerIcons: Record<
  CredentialsProviderName,
  React.FC<{ className?: string }>
> = {
  anthropic: fallbackIcon,
  e2b: fallbackIcon,
  github: FaGithub,
  google: FaGoogle,
  groq: fallbackIcon,
  notion: NotionLogoIcon,
  discord: FaDiscord,
  d_id: fallbackIcon,
  google_maps: FaGoogle,
  jina: fallbackIcon,
  ideogram: fallbackIcon,
  medium: FaMedium,
  ollama: fallbackIcon,
  openai: fallbackIcon,
  openweathermap: fallbackIcon,
  open_router: fallbackIcon,
  pinecone: fallbackIcon,
  slant3d: fallbackIcon,
  replicate: fallbackIcon,
  fal: fallbackIcon,
  revid: fallbackIcon,
  unreal_speech: fallbackIcon,
  exa: fallbackIcon,
  hubspot: fallbackIcon,
};

You will also need to add the provider to the CredentialsProvider component in frontend/src/components/integrations/credentials-provider.tsx.

frontend/src/components/integrations/credentials-provider.tsx
const providerDisplayNames: Record<CredentialsProviderName, string> = {
  anthropic: "Anthropic",
  discord: "Discord",
  d_id: "D-ID",
  e2b: "E2B",
  github: "GitHub",
  google: "Google",
  google_maps: "Google Maps",
  groq: "Groq",
  ideogram: "Ideogram",
  jina: "Jina",
  medium: "Medium",
  notion: "Notion",
  ollama: "Ollama",
  openai: "OpenAI",
  openweathermap: "OpenWeatherMap",
  open_router: "Open Router",
  pinecone: "Pinecone",
  slant3d: "Slant3D",
  replicate: "Replicate",
  fal: "FAL",
  revid: "Rev.ID",
  unreal_speech: "Unreal Speech",
  exa: "Exa",
  hubspot: "Hubspot",
} as const;

Finally you will need to add the provider to the CredentialsType enum in frontend/src/lib/autogpt-server-api/types.ts.

frontend/src/lib/autogpt-server-api/types.ts
export const PROVIDER_NAMES = {
  ANTHROPIC: "anthropic",
  D_ID: "d_id",
  DISCORD: "discord",
  E2B: "e2b",
  GITHUB: "github",
  GOOGLE: "google",
  GOOGLE_MAPS: "google_maps",
  GROQ: "groq",
  IDEOGRAM: "ideogram",
  JINA: "jina",
  MEDIUM: "medium",
  NOTION: "notion",
  OLLAMA: "ollama",
  OPENAI: "openai",
  OPENWEATHERMAP: "openweathermap",
  OPEN_ROUTER: "open_router",
  PINECONE: "pinecone",
  SLANT3D: "slant3d",
  REPLICATE: "replicate",
  FAL: "fal",
  REVID: "revid",
  UNREAL_SPEECH: "unreal_speech",
  EXA: "exa",
  HUBSPOT: "hubspot",
} as const;

Example: GitHub integration

backend/blocks/github/issues.py
class GithubCommentBlock(Block):
    class Input(BlockSchema):
        credentials: GithubCredentialsInput = GithubCredentialsField("repo")
        issue_url: str = SchemaField(
            description="URL of the GitHub issue or pull request",
            placeholder="https://github.com/owner/repo/issues/1",
        )
        comment: str = SchemaField(
            description="Comment to post on the issue or pull request",
            placeholder="Enter your comment",
        )

    class Output(BlockSchema):
        id: int = SchemaField(description="ID of the created comment")
        url: str = SchemaField(description="URL to the comment on GitHub")
        error: str = SchemaField(
            description="Error message if the comment posting failed"
        )

    def __init__(self):
        super().__init__(
            id="a8db4d8d-db1c-4a25-a1b0-416a8c33602b",
            description="This block posts a comment on a specified GitHub issue or pull request.",
            categories={BlockCategory.DEVELOPER_TOOLS},
            input_schema=GithubCommentBlock.Input,
            output_schema=GithubCommentBlock.Output,
            test_input=[
                {
                    "issue_url": "https://github.com/owner/repo/issues/1",
                    "comment": "This is a test comment.",
                    "credentials": TEST_CREDENTIALS_INPUT,
                },
                {
                    "issue_url": "https://github.com/owner/repo/pull/1",
                    "comment": "This is a test comment.",
                    "credentials": TEST_CREDENTIALS_INPUT,
                },
            ],
            test_credentials=TEST_CREDENTIALS,
            test_output=[
                ("id", 1337),
                ("url", "https://github.com/owner/repo/issues/1#issuecomment-1337"),
                ("id", 1337),
                (
                    "url",
                    "https://github.com/owner/repo/issues/1#issuecomment-1337",
                ),
            ],
            test_mock={
                "post_comment": lambda *args, **kwargs: (
                    1337,
                    "https://github.com/owner/repo/issues/1#issuecomment-1337",
                )
            },
        )

    @staticmethod
    def post_comment(
        credentials: GithubCredentials, issue_url: str, body_text: str
    ) -> tuple[int, str]:
        api = get_api(credentials)
        data = {"body": body_text}
        if "pull" in issue_url:
            issue_url = issue_url.replace("pull", "issues")
        comments_url = issue_url + "/comments"
        response = api.post(comments_url, json=data)
        comment = response.json()
        return comment["id"], comment["html_url"]

    def run(
        self,
        input_data: Input,
        *,
        credentials: GithubCredentials,
        **kwargs,
    ) -> BlockOutput:
        id, url = self.post_comment(
            credentials,
            input_data.issue_url,
            input_data.comment,
        )
        yield "id", id
        yield "url", url
backend/integrations/oauth/github.py
class GitHubOAuthHandler(BaseOAuthHandler):
    """
    Based on the documentation at:
    - [Authorizing OAuth apps - GitHub Docs](https://docs.github.com/en/apps/oauth-apps/building-oauth-apps/authorizing-oauth-apps)
    - [Refreshing user access tokens - GitHub Docs](https://docs.github.com/en/apps/creating-github-apps/authenticating-with-a-github-app/refreshing-user-access-tokens)

    Notes:
    - By default, token expiration is disabled on GitHub Apps. This means the access
      token doesn't expire and no refresh token is returned by the authorization flow.
    - When token expiration gets enabled, any existing tokens will remain non-expiring.
    - When token expiration gets disabled, token refreshes will return a non-expiring
      access token *with no refresh token*.
    """  # noqa

    PROVIDER_NAME = ProviderName.GITHUB

    def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
        self.auth_base_url = "https://github.com/login/oauth/authorize"
        self.token_url = "https://github.com/login/oauth/access_token"
        self.revoke_url = "https://api.github.com/applications/{client_id}/token"

    def get_login_url(self, scopes: list[str], state: str) -> str:
        params = {
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            "scope": " ".join(scopes),
            "state": state,
        }
        return f"{self.auth_base_url}?{urlencode(params)}"

    def exchange_code_for_tokens(
        self, code: str, scopes: list[str]
    ) -> OAuth2Credentials:
        return self._request_tokens({"code": code, "redirect_uri": self.redirect_uri})

    def revoke_tokens(self, credentials: OAuth2Credentials) -> bool:
        if not credentials.access_token:
            raise ValueError("No access token to revoke")

        headers = {
            "Accept": "application/vnd.github+json",
            "X-GitHub-Api-Version": "2022-11-28",
        }

        requests.delete(
            url=self.revoke_url.format(client_id=self.client_id),
            auth=(self.client_id, self.client_secret),
            headers=headers,
            json={"access_token": credentials.access_token.get_secret_value()},
        )
        return True

    def _refresh_tokens(self, credentials: OAuth2Credentials) -> OAuth2Credentials:
        if not credentials.refresh_token:
            return credentials

        return self._request_tokens(
            {
                "refresh_token": credentials.refresh_token.get_secret_value(),
                "grant_type": "refresh_token",
            }
        )

    def _request_tokens(
        self,
        params: dict[str, str],
        current_credentials: Optional[OAuth2Credentials] = None,
    ) -> OAuth2Credentials:
        request_body = {
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            **params,
        }
        headers = {"Accept": "application/json"}
        response = requests.post(self.token_url, data=request_body, headers=headers)
        token_data: dict = response.json()

        username = self._request_username(token_data["access_token"])

        now = int(time.time())
        new_credentials = OAuth2Credentials(
            provider=self.PROVIDER_NAME,
            title=current_credentials.title if current_credentials else None,
            username=username,
            access_token=token_data["access_token"],
            # Token refresh responses have an empty `scope` property (see docs),
            # so we have to get the scope from the existing credentials object.
            scopes=(
                token_data.get("scope", "").split(",")
                or (current_credentials.scopes if current_credentials else [])
            ),
            # Refresh token and expiration intervals are only given if token expiration
            # is enabled in the GitHub App's settings.
            refresh_token=token_data.get("refresh_token"),
            access_token_expires_at=(
                now + expires_in
                if (expires_in := token_data.get("expires_in", None))
                else None
            ),
            refresh_token_expires_at=(
                now + expires_in
                if (expires_in := token_data.get("refresh_token_expires_in", None))
                else None
            ),
        )
        if current_credentials:
            new_credentials.id = current_credentials.id
        return new_credentials

    def _request_username(self, access_token: str) -> str | None:
        url = "https://api.github.com/user"
        headers = {
            "Accept": "application/vnd.github+json",
            "Authorization": f"Bearer {access_token}",
            "X-GitHub-Api-Version": "2022-11-28",
        }

        response = requests.get(url, headers=headers)

        if not response.ok:
            return None

        # Get the login (username)
        return response.json().get("login")

Example: Google integration

backend/integrations/oauth/google.py
class GoogleOAuthHandler(BaseOAuthHandler):
    """
    Based on the documentation at https://developers.google.com/identity/protocols/oauth2/web-server
    """  # noqa

    PROVIDER_NAME = ProviderName.GOOGLE
    EMAIL_ENDPOINT = "https://www.googleapis.com/oauth2/v2/userinfo"
    DEFAULT_SCOPES = [
        "https://www.googleapis.com/auth/userinfo.email",
        "https://www.googleapis.com/auth/userinfo.profile",
        "openid",
    ]

You can see that google has defined a DEFAULT_SCOPES variable, this is used to set the scopes that are requested no matter what the user asks for.

backend/blocks/google/_auth.py
secrets = Secrets()
GOOGLE_OAUTH_IS_CONFIGURED = bool(
    secrets.google_client_id and secrets.google_client_secret
)

You can also see that GOOGLE_OAUTH_IS_CONFIGURED is used to disable the blocks that require OAuth if the oauth is not configured. This is in the __init__ method of each block. This is because there is no api key fallback for google blocks so we need to make sure that the oauth is configured before we allow the user to use the blocks.

Webhook-triggered Blocks

Webhook-triggered blocks allow your agent to respond to external events in real-time. These blocks are triggered by incoming webhooks from third-party services rather than being executed manually.

Creating and running a webhook-triggered block involves three main components:

  • The block itself, which specifies:
    • Inputs for the user to select a resource and events to subscribe to
    • A credentials input with the scopes needed to manage webhooks
    • Logic to turn the webhook payload into outputs for the webhook block
  • The WebhooksManager for the corresponding webhook service provider, which handles:
    • (De)registering webhooks with the provider
    • Parsing and validating incoming webhook payloads
  • The credentials system for the corresponding service provider, which may include an OAuthHandler

There is more going on under the hood, e.g. to store and retrieve webhooks and their links to nodes, but to add a webhook-triggered block you shouldn't need to make changes to those parts of the system.

Creating a Webhook-triggered Block

To create a webhook-triggered block, follow these additional steps on top of the basic block creation process:

  1. Define webhook_config in your block's __init__ method.

    Example: GitHubPullRequestTriggerBlock

    backend/blocks/github/triggers.py
    webhook_config=BlockWebhookConfig(
        provider="github",
        webhook_type=GithubWebhookType.REPO,
        resource_format="{repo}",
        event_filter_input="events",
        event_format="pull_request.{event}",
    ),
    

    BlockWebhookConfig definition

    backend/data/block.py
    class BlockWebhookConfig(BaseModel):
        provider: str
        """The service provider that the webhook connects to"""
    
        webhook_type: str
        """
        Identifier for the webhook type. E.g. GitHub has repo and organization level hooks.
    
        Only for use in the corresponding `WebhooksManager`.
        """
    
        resource_format: str
        """
        Template string for the resource that a block instance subscribes to.
        Fields will be filled from the block's inputs (except `payload`).
    
        Example: `f"{repo}/pull_requests"` (note: not how it's actually implemented)
    
        Only for use in the corresponding `WebhooksManager`.
        """
    
        event_filter_input: str
        """Name of the block's event filter input."""
    
        event_format: str = "{event}"
        """
        Template string for the event(s) that a block instance subscribes to.
        Applied individually to each event selected in the event filter input.
    
        Example: `"pull_request.{event}"` -> `"pull_request.opened"`
        """
    

  2. Define event filter input in your block's Input schema. This allows the user to select which specific types of events will trigger the block in their agent.

    Example: GitHubPullRequestTriggerBlock

    backend/blocks/github/triggers.py
    class Input(GitHubTriggerBase.Input):
        class EventsFilter(BaseModel):
            """
            https://docs.github.com/en/webhooks/webhook-events-and-payloads#pull_request
            """
    
            opened: bool = False
            edited: bool = False
            closed: bool = False
            reopened: bool = False
            synchronize: bool = False
            assigned: bool = False
            unassigned: bool = False
            labeled: bool = False
            unlabeled: bool = False
            converted_to_draft: bool = False
            locked: bool = False
            unlocked: bool = False
            enqueued: bool = False
            dequeued: bool = False
            milestoned: bool = False
            demilestoned: bool = False
            ready_for_review: bool = False
            review_requested: bool = False
            review_request_removed: bool = False
            auto_merge_enabled: bool = False
            auto_merge_disabled: bool = False
    
        events: EventsFilter = SchemaField(
            title="Events", description="The events to subscribe to"
        )
    

    • The name of the input field (events in this case) must match webhook_config.event_filter_input.
    • The event filter itself must be a Pydantic model with only boolean fields.
  3. Include payload field in your block's Input schema.

    Example: GitHubTriggerBase

    backend/blocks/github/triggers.py
    payload: dict = SchemaField(hidden=True, default={})
    

  4. Define credentials input in your block's Input schema.

    • Its scopes must be sufficient to manage a user's webhooks through the provider's API
    • See Blocks with authentication for further details
  5. Process webhook payload and output relevant parts of it in your block's run method.

    Example: GitHubPullRequestTriggerBlock

    def run(self, input_data: Input, **kwargs) -> BlockOutput:
        yield "payload", input_data.payload
        yield "sender", input_data.payload["sender"]
        yield "event", input_data.payload["action"]
        yield "number", input_data.payload["number"]
        yield "pull_request", input_data.payload["pull_request"]
    

    Note that the credentials parameter can be omitted if the credentials aren't used at block runtime, like in the example.

Adding a Webhooks Manager

To add support for a new webhook provider, you'll need to create a WebhooksManager that implements the BaseWebhooksManager interface:

backend/integrations/webhooks/base.py
PROVIDER_NAME: ClassVar[ProviderName]

@abstractmethod
async def _register_webhook(
    self,
    credentials: Credentials,
    webhook_type: WT,
    resource: str,
    events: list[str],
    ingress_url: str,
    secret: str,
) -> tuple[str, dict]:
    """
    Registers a new webhook with the provider.

    Params:
        credentials: The credentials with which to create the webhook
        webhook_type: The provider-specific webhook type to create
        resource: The resource to receive events for
        events: The events to subscribe to
        ingress_url: The ingress URL for webhook payloads
        secret: Secret used to verify webhook payloads

    Returns:
        str: Webhook ID assigned by the provider
        config: Provider-specific configuration for the webhook
    """
    ...

@classmethod
@abstractmethod
async def validate_payload(
    cls, webhook: integrations.Webhook, request: Request
) -> tuple[dict, str]:
    """
    Validates an incoming webhook request and returns its payload and type.

    Params:
        webhook: Object representing the configured webhook and its properties in our system.
        request: Incoming FastAPI `Request`

    Returns:
        dict: The validated payload
        str: The event type associated with the payload
    """

@abstractmethod
async def _deregister_webhook(
    self, webhook: integrations.Webhook, credentials: Credentials
) -> None: ...

async def trigger_ping(
    self, webhook: integrations.Webhook, credentials: Credentials | None
) -> None:
    """
    Triggers a ping to the given webhook.

    Raises:
        NotImplementedError: if the provider doesn't support pinging
    """

And add a reference to your WebhooksManager class in WEBHOOK_MANAGERS_BY_NAME:

backend/integrations/webhooks/__init__.py
WEBHOOK_MANAGERS_BY_NAME: dict["ProviderName", type["BaseWebhooksManager"]] = {
    handler.PROVIDER_NAME: handler
    for handler in [
        GithubWebhooksManager,
        Slant3DWebhooksManager,
    ]
}

Example: GitHub Webhook Integration

GitHub Webhook triggers: blocks/github/triggers.py
backend/blocks/github/triggers.py
class GitHubTriggerBase:
    class Input(BlockSchema):
        credentials: GithubCredentialsInput = GithubCredentialsField("repo")
        repo: str = SchemaField(
            description=(
                "Repository to subscribe to.\n\n"
                "**Note:** Make sure your GitHub credentials have permissions "
                "to create webhooks on this repo."
            ),
            placeholder="{owner}/{repo}",
        )
        payload: dict = SchemaField(hidden=True, default={})

    class Output(BlockSchema):
        payload: dict = SchemaField(
            description="The complete webhook payload that was received from GitHub. "
            "Includes information about the affected resource (e.g. pull request), "
            "the event, and the user who triggered the event."
        )
        triggered_by_user: dict = SchemaField(
            description="Object representing the GitHub user who triggered the event"
        )
        error: str = SchemaField(
            description="Error message if the payload could not be processed"
        )

    def run(self, input_data: Input, **kwargs) -> BlockOutput:
        yield "payload", input_data.payload
        yield "triggered_by_user", input_data.payload["sender"]


class GithubPullRequestTriggerBlock(GitHubTriggerBase, Block):
    EXAMPLE_PAYLOAD_FILE = (
        Path(__file__).parent / "example_payloads" / "pull_request.synchronize.json"
    )

    class Input(GitHubTriggerBase.Input):
        class EventsFilter(BaseModel):
            """
            https://docs.github.com/en/webhooks/webhook-events-and-payloads#pull_request
            """

            opened: bool = False
            edited: bool = False
            closed: bool = False
            reopened: bool = False
            synchronize: bool = False
            assigned: bool = False
            unassigned: bool = False
            labeled: bool = False
            unlabeled: bool = False
            converted_to_draft: bool = False
            locked: bool = False
            unlocked: bool = False
            enqueued: bool = False
            dequeued: bool = False
            milestoned: bool = False
            demilestoned: bool = False
            ready_for_review: bool = False
            review_requested: bool = False
            review_request_removed: bool = False
            auto_merge_enabled: bool = False
            auto_merge_disabled: bool = False

        events: EventsFilter = SchemaField(
            title="Events", description="The events to subscribe to"
        )

    class Output(GitHubTriggerBase.Output):
        event: str = SchemaField(
            description="The PR event that triggered the webhook (e.g. 'opened')"
        )
        number: int = SchemaField(description="The number of the affected pull request")
        pull_request: dict = SchemaField(
            description="Object representing the affected pull request"
        )
        pull_request_url: str = SchemaField(
            description="The URL of the affected pull request"
        )

    def __init__(self):
        from backend.integrations.webhooks.github import GithubWebhookType

        example_payload = json.loads(
            self.EXAMPLE_PAYLOAD_FILE.read_text(encoding="utf-8")
        )

        super().__init__(
            id="6c60ec01-8128-419e-988f-96a063ee2fea",
            description="This block triggers on pull request events and outputs the event type and payload.",
            categories={BlockCategory.DEVELOPER_TOOLS, BlockCategory.INPUT},
            input_schema=GithubPullRequestTriggerBlock.Input,
            output_schema=GithubPullRequestTriggerBlock.Output,
            webhook_config=BlockWebhookConfig(
                provider="github",
                webhook_type=GithubWebhookType.REPO,
                resource_format="{repo}",
                event_filter_input="events",
                event_format="pull_request.{event}",
            ),
            test_input={
                "repo": "Significant-Gravitas/AutoGPT",
                "events": {"opened": True, "synchronize": True},
                "credentials": TEST_CREDENTIALS_INPUT,
                "payload": example_payload,
            },
            test_credentials=TEST_CREDENTIALS,
            test_output=[
                ("payload", example_payload),
                ("triggered_by_user", example_payload["sender"]),
                ("event", example_payload["action"]),
                ("number", example_payload["number"]),
                ("pull_request", example_payload["pull_request"]),
                ("pull_request_url", example_payload["pull_request"]["html_url"]),
            ],
        )

    def run(self, input_data: Input, **kwargs) -> BlockOutput:  # type: ignore
        yield from super().run(input_data, **kwargs)
        yield "event", input_data.payload["action"]
        yield "number", input_data.payload["number"]
        yield "pull_request", input_data.payload["pull_request"]
        yield "pull_request_url", input_data.payload["pull_request"]["html_url"]
GitHub Webhooks Manager: integrations/webhooks/github.py
backend/integrations/webhooks/github.py
class GithubWebhookType(StrEnum):
    REPO = "repo"


class GithubWebhooksManager(BaseWebhooksManager):
    PROVIDER_NAME = ProviderName.GITHUB

    WebhookType = GithubWebhookType

    GITHUB_API_URL = "https://api.github.com"
    GITHUB_API_DEFAULT_HEADERS = {"Accept": "application/vnd.github.v3+json"}

    @classmethod
    async def validate_payload(
        cls, webhook: integrations.Webhook, request: Request
    ) -> tuple[dict, str]:
        if not (event_type := request.headers.get("X-GitHub-Event")):
            raise HTTPException(
                status_code=400, detail="X-GitHub-Event header is missing!"
            )

        if not (signature_header := request.headers.get("X-Hub-Signature-256")):
            raise HTTPException(
                status_code=403, detail="X-Hub-Signature-256 header is missing!"
            )

        payload_body = await request.body()
        hash_object = hmac.new(
            webhook.secret.encode("utf-8"), msg=payload_body, digestmod=hashlib.sha256
        )
        expected_signature = "sha256=" + hash_object.hexdigest()

        if not hmac.compare_digest(expected_signature, signature_header):
            raise HTTPException(
                status_code=403, detail="Request signatures didn't match!"
            )

        payload = await request.json()
        if action := payload.get("action"):
            event_type += f".{action}"

        return payload, event_type

    async def trigger_ping(
        self, webhook: integrations.Webhook, credentials: Credentials | None
    ) -> None:
        if not credentials:
            raise ValueError("Credentials are required but were not passed")

        headers = {
            **self.GITHUB_API_DEFAULT_HEADERS,
            "Authorization": credentials.bearer(),
        }

        repo, github_hook_id = webhook.resource, webhook.provider_webhook_id
        ping_url = f"{self.GITHUB_API_URL}/repos/{repo}/hooks/{github_hook_id}/pings"

        response = requests.post(ping_url, headers=headers)

        if response.status_code != 204:
            error_msg = extract_github_error_msg(response)
            raise ValueError(f"Failed to ping GitHub webhook: {error_msg}")

    async def _register_webhook(
        self,
        credentials: Credentials,
        webhook_type: GithubWebhookType,
        resource: str,
        events: list[str],
        ingress_url: str,
        secret: str,
    ) -> tuple[str, dict]:
        if webhook_type == self.WebhookType.REPO and resource.count("/") > 1:
            raise ValueError("Invalid repo format: expected 'owner/repo'")

        # Extract main event, e.g. `pull_request.opened` -> `pull_request`
        github_events = list({event.split(".")[0] for event in events})

        headers = {
            **self.GITHUB_API_DEFAULT_HEADERS,
            "Authorization": credentials.bearer(),
        }
        webhook_data = {
            "name": "web",
            "active": True,
            "events": github_events,
            "config": {
                "url": ingress_url,
                "content_type": "json",
                "insecure_ssl": "0",
                "secret": secret,
            },
        }

        response = requests.post(
            f"{self.GITHUB_API_URL}/repos/{resource}/hooks",
            headers=headers,
            json=webhook_data,
        )

        if response.status_code != 201:
            error_msg = extract_github_error_msg(response)
            if "not found" in error_msg.lower():
                error_msg = (
                    f"{error_msg} "
                    "(Make sure the GitHub account or API key has 'repo' or "
                    f"webhook create permissions to '{resource}')"
                )
            raise ValueError(f"Failed to create GitHub webhook: {error_msg}")

        webhook_id = response.json()["id"]
        config = response.json()["config"]

        return str(webhook_id), config

    async def _deregister_webhook(
        self, webhook: integrations.Webhook, credentials: Credentials
    ) -> None:
        webhook_type = self.WebhookType(webhook.webhook_type)
        if webhook.credentials_id != credentials.id:
            raise ValueError(
                f"Webhook #{webhook.id} does not belong to credentials {credentials.id}"
            )

        headers = {
            **self.GITHUB_API_DEFAULT_HEADERS,
            "Authorization": credentials.bearer(),
        }

        if webhook_type == self.WebhookType.REPO:
            repo = webhook.resource
            delete_url = f"{self.GITHUB_API_URL}/repos/{repo}/hooks/{webhook.provider_webhook_id}"  # noqa
        else:
            raise NotImplementedError(
                f"Unsupported webhook type '{webhook.webhook_type}'"
            )

        response = requests.delete(delete_url, headers=headers)

        if response.status_code not in [204, 404]:
            # 204 means successful deletion, 404 means the webhook was already deleted
            error_msg = extract_github_error_msg(response)
            raise ValueError(f"Failed to delete GitHub webhook: {error_msg}")

        # If we reach here, the webhook was successfully deleted or didn't exist

Key Points to Remember

  • Unique ID: Give your block a unique ID in the init method.
  • Input and Output Schemas: Define clear input and output schemas.
  • Error Handling: Implement error handling in the run method.
  • Output Results: Use yield to output results in the run method.
  • Testing: Provide test input and output in the init method for automatic testing.

Understanding the Testing Process

The testing of blocks is handled by test_block.py, which does the following:

  1. It calls the block with the provided test_input.
    If the block has a credentials field, test_credentials is passed in as well.
  2. If a test_mock is provided, it temporarily replaces the specified methods with the mock functions.
  3. It then asserts that the output matches the test_output.

For the WikipediaSummaryBlock:

  • The test will call the block with the topic "Artificial Intelligence".
  • Instead of making a real API call, it will use the mock function, which returns {"extract": "summary content"}.
  • It will then check if the output key is "summary" and its value is a string.

This approach allows us to test the block's logic comprehensively without relying on external services, while also accommodating non-deterministic outputs.

Security Best Practices for SSRF Prevention

When creating blocks that handle external URL inputs or make network requests, it's crucial to use the platform's built-in SSRF protection mechanisms. The backend.util.request module provides a secure Requests wrapper class that should be used for all HTTP requests.

Using the Secure Requests Wrapper

from backend.util.request import requests

class MyNetworkBlock(Block):
    def run(self, input_data: Input, **kwargs) -> BlockOutput:
        try:
            # The requests wrapper automatically validates URLs and blocks dangerous requests
            response = requests.get(input_data.url)
            yield "result", response.text
        except ValueError as e:
            # URL validation failed
            raise RuntimeError(f"Invalid URL provided: {e}")
        except requests.exceptions.RequestException as e:
            # Request failed
            raise RuntimeError(f"Request failed: {e}")

The Requests wrapper provides these security features:

  1. URL Validation:

    • Blocks requests to private IP ranges (RFC 1918)
    • Validates URL format and protocol
    • Resolves DNS and checks IP addresses
    • Supports whitelisting trusted origins
  2. Secure Defaults:

    • Disables redirects by default
    • Raises exceptions for non-200 status codes
    • Supports custom headers and validators
  3. Protected IP Ranges: The wrapper denies requests to these networks:

    backend/util/request.py
    ipaddress.ip_network("0.0.0.0/8"),  # "This" Network
    ipaddress.ip_network("10.0.0.0/8"),  # Private-Use
    ipaddress.ip_network("127.0.0.0/8"),  # Loopback
    ipaddress.ip_network("169.254.0.0/16"),  # Link Local
    ipaddress.ip_network("172.16.0.0/12"),  # Private-Use
    ipaddress.ip_network("192.168.0.0/16"),  # Private-Use
    ipaddress.ip_network("224.0.0.0/4"),  # Multicast
    ipaddress.ip_network("240.0.0.0/4"),  # Reserved for Future Use
    

Custom Request Configuration

If you need to customize the request behavior:

from backend.util.request import Requests

# Create a custom requests instance with specific trusted origins
custom_requests = Requests(
    trusted_origins=["api.trusted-service.com"],
    raise_for_status=True,
    extra_headers={"User-Agent": "MyBlock/1.0"}
)

Tips for Effective Block Testing

  1. Provide realistic test_input: Ensure your test input covers typical use cases.

  2. Define appropriate test_output:

    • For deterministic outputs, use specific expected values.
    • For non-deterministic outputs or when only the type matters, use Python types (e.g., str, int, dict).
    • You can mix specific values and types, e.g., ("key1", str), ("key2", 42).
  3. Use test_mock for network calls: This prevents tests from failing due to network issues or API changes.

  4. Consider omitting test_mock for blocks without external dependencies: If your block doesn't make network calls or use external resources, you might not need a mock.

  5. Consider edge cases: Include tests for potential error conditions in your run method.

  6. Update tests when changing block behavior: If you modify your block, ensure the tests are updated accordingly.

By following these steps, you can create new blocks that extend the functionality of the AutoGPT Agent Server.

Blocks we want to see

Below is a list of blocks that we would like to see implemented in the AutoGPT Agent Server. If you're interested in contributing, feel free to pick one of these blocks or chose your own.

If you would like to implement one of these blocks, open a pull request and we will start the review process.

Consumer Services/Platforms

  • Google sheets - Read/Append
  • Email - Read/Send with Gmail, Outlook, Yahoo, Proton, etc
  • Calendar - Read/Write with Google Calendar, Outlook Calendar, etc
  • Home Assistant - Call Service, Get Status
  • Dominos - Order Pizza, Track Order
  • Uber - Book Ride, Track Ride
  • Notion - Create/Read Page, Create/Append/Read DB
  • Google drive - read/write/overwrite file/folder

Social Media

  • Twitter - Post, Reply, Get Replies, Get Comments, Get Followers, Get Following, Get Tweets, Get Mentions
  • Instagram - Post, Reply, Get Comments, Get Followers, Get Following, Get Posts, Get Mentions, Get Trending Posts
  • TikTok - Post, Reply, Get Comments, Get Followers, Get Following, Get Videos, Get Mentions, Get Trending Videos
  • LinkedIn - Post, Reply, Get Comments, Get Followers, Get Following, Get Posts, Get Mentions, Get Trending Posts
  • YouTube - Transcribe Videos/Shorts, Post Videos/Shorts, Read/Reply/React to Comments, Update Thumbnails, Update Description, Update Tags, Update Titles, Get Views, Get Likes, Get Dislikes, Get Subscribers, Get Comments, Get Shares, Get Watch Time, Get Revenue, Get Trending Videos, Get Top Videos, Get Top Channels
  • Reddit - Post, Reply, Get Comments, Get Followers, Get Following, Get Posts, Get Mentions, Get Trending Posts
  • Treatwell (and related Platforms) - Book, Cancel, Review, Get Recommendations
  • Substack - Read/Subscribe/Unsubscribe, Post/Reply, Get Recommendations
  • Discord - Read/Post/Reply, Moderation actions
  • GoodReads - Read/Post/Reply, Get Recommendations

E-commerce

  • Airbnb - Book, Cancel, Review, Get Recommendations
  • Amazon - Order, Track Order, Return, Review, Get Recommendations
  • eBay - Order, Track Order, Return, Review, Get Recommendations
  • Upwork - Post Jobs, Hire Freelancer, Review Freelancer, Fire Freelancer

Business Tools

  • External Agents - Call other agents similar to AutoGPT
  • Trello - Create/Read/Update/Delete Cards, Lists, Boards
  • Jira - Create/Read/Update/Delete Issues, Projects, Boards
  • Linear - Create/Read/Update/Delete Issues, Projects, Boards
  • Excel - Read/Write/Update/Delete Rows, Columns, Sheets
  • Slack - Read/Post/Reply to Messages, Create Channels, Invite Users
  • ERPNext - Create/Read/Update/Delete Invoices, Orders, Customers, Products
  • Salesforce - Create/Read/Update/Delete Leads, Opportunities, Accounts
  • HubSpot - Create/Read/Update/Delete Contacts, Deals, Companies
  • Zendesk - Create/Read/Update/Delete Tickets, Users, Organizations
  • Odoo - Create/Read/Update/Delete Sales Orders, Invoices, Customers
  • Shopify - Create/Read/Update/Delete Products, Orders, Customers
  • WooCommerce - Create/Read/Update/Delete Products, Orders, Customers
  • Squarespace - Create/Read/Update/Delete Pages, Products, Orders

Agent Templates we want to see

Data/Information

  • Summarize top news of today, of this week, this month via Apple News or other large media outlets BBC, TechCrunch, hackernews, etc
  • Create, read, and summarize substack newsletters or any newsletters (blog writer vs blog reader)
  • Get/read/summarize the most viral Twitter, Instagram, TikTok (general social media accounts) of the day, week, month
  • Get/Read any LinkedIn posts or profile that mention AI Agents
  • Read/Summarize discord (might not be able to do this because you need access)
  • Read / Get most read books in a given month, year, etc from GoodReads or Amazon Books, etc
  • Get dates for specific shows across all streaming services
  • Suggest/Recommend/Get most watched shows in a given month, year, etc across all streaming platforms
  • Data analysis from xlsx data set
  • Gather via Excel or Google Sheets data > Sample the data randomly (sample block takes top X, bottom X, randomly, etc) > pass that to LLM Block to generate a script for analysis of the full data > Python block to run the script> making a loop back through LLM Fix Block on error > create chart/visualization (potentially in the code block?) > show the image as output (this may require frontend changes to show)
  • Tiktok video search and download

Marketing

  • Portfolio site design and enhancements