A deep dive into Mastra AI workflows with code examples

Master the art of building robust AI applications with Mastra's TypeScript-native workflows. This guide covers everything from basic concepts to advanced patterns, complete with practical code examples.

#typescript #ai #mastra

If you’ve been following the AI development landscape, you’ve probably noticed the growing complexity of building robust AI applications. What starts as a simple LLM call quickly becomes a tangled web of prompts, error handling, and coordination logic. This is where Mastra AI workflows come in — a TypeScript-native solution that brings structure, reliability, and developer-friendly patterns to your AI applications.

In this deep dive, I’ll walk you through everything you need to know about Mastra AI workflows, from basic concepts to advanced patterns, all with practical code examples you can adapt for your own projects.

What Are Mastra AI Workflows?

At their core, Mastra workflows are graph-based state machines that help you orchestrate complex sequences of AI operations. Instead of a jumble of async/await calls and if/else branches scattered throughout your codebase, workflows let you define discrete steps with clear inputs, outputs, and execution logic.

As the team behind Mastra puts it:

“Workflows are durable graph-based state machines. They have loops, branching, wait for human input, embed other workflows, do error handling, retries, parsing and so on.”

The beauty of Mastra’s approach is that it combines the flexibility of a general-purpose programming language (TypeScript) with the structure of a workflow engine.

Getting Started with Workflows

Before diving into complex examples, let’s understand the basic building blocks of a Mastra workflow.

1. Installing Mastra

If you’re starting a new project, the easiest way is to use the CLI:

npm create mastra

The CLI will guide you through setting up a project with the components you need. Make sure to select “Workflows” when prompted.

For an existing project, you can install the core package:

npm install @mastra/core

2. Creating Your First Workflow

Let’s start with a simple workflow that takes a number as input, doubles it in the first step, and then increments the result in the second step:

import { createWorkflow, createStep } from "@mastra/core/workflows";
import { Mastra } from "@mastra/core";
import { z } from "zod";

// Step 1: Double the input value
const doubleStep = createStep({
  id: "doubleStep",
  inputSchema: z.object({
    inputValue: z.number(),
  }),
  outputSchema: z.object({
    doubledValue: z.number(),
  }),
  execute: async ({ context }) => {
    const inputValue = context.inputData.inputValue;
    return {
      doubledValue: inputValue * 2,
    };
  },
});

// Step 2: Increment the doubled value
const incrementStep = createStep({
  id: "incrementStep",
  inputSchema: z.object({
    doubledValue: z.number(),
  }),
  outputSchema: z.object({
    finalValue: z.number(),
  }),
  execute: async ({ context }) => {
    const doubledValue = context.inputData.doubledValue;
    return {
      finalValue: doubledValue + 1,
    };
  },
});

// Create a workflow
const myWorkflow = createWorkflow({
  id: "my-workflow",
  inputSchema: z.object({
    inputValue: z.number(),
  }),
  outputSchema: z.object({
    finalValue: z.number(),
  }),
})
  .then(doubleStep)
  .then(incrementStep)
  .commit();

// Register with Mastra
export const mastra = new Mastra({
  workflows: {
    myWorkflow,
  },
});

3. Running the Workflow

Now, let’s run our workflow:

import { mastra } from "./path-to-your-workflow";

async function runWorkflow() {
  // Get the workflow
  const workflow = mastra.getWorkflow("myWorkflow");
  
  // Create a run
  const run = await workflow.createRunAsync();
  
  // Start the workflow with input data
  const result = await run.start({
    inputData: {
      inputValue: 5,
    },
  });
  
  console.log("Result:", result);
  // Should output something like:
  // Result: {
  //   status: "success",
  //   steps: {
  //     doubleStep: { status: "success", result: { doubledValue: 10 } },
  //     incrementStep: { status: "success", result: { finalValue: 11 } }
  //   },
  //   result: { finalValue: 11 }
  // }
}

runWorkflow();

You can also run workflows through the API if you’re using Mastra’s local development server or a deployed instance:

curl --location 'http://localhost:4111/api/workflows/myWorkflow/run' \
  --header 'Content-Type: application/json' \
  --data '{ "inputValue": 5 }'

Advanced Control Flow

Real-world applications rarely follow a simple linear path. Let’s explore how Mastra handles more complex control flow patterns.

1. Parallel Execution

Sometimes you need to run multiple steps simultaneously, especially when they don’t depend on each other:

import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";

// Define two independent steps
const fetchUserData = createStep({
  id: "fetchUserData",
  inputSchema: z.object({}),
  outputSchema: z.object({
    user: z.object({
      id: z.string(),
      name: z.string(),
    }),
  }),
  execute: async () => {
    // Simulate API call
    await new Promise(resolve => setTimeout(resolve, 1000));
    return {
      user: {
        id: "123",
        name: "Alice",
      },
    };
  },
});

const fetchWeatherData = createStep({
  id: "fetchWeatherData",
  inputSchema: z.object({}),
  outputSchema: z.object({
    weather: z.object({
      temperature: z.number(),
      condition: z.string(),
    }),
  }),
  execute: async () => {
    // Simulate API call
    await new Promise(resolve => setTimeout(resolve, 1000));
    return {
      weather: {
        temperature: 72,
        condition: "sunny",
      },
    };
  },
});

// Add a step that uses the results from both parallel steps
const combineResults = createStep({
  id: "combineResults",
  inputSchema: z.object({
    user: z.object({
      id: z.string(),
      name: z.string(),
    }),
    weather: z.object({
      temperature: z.number(),
      condition: z.string(),
    }),
  }),
  outputSchema: z.object({
    message: z.string(),
  }),
  execute: async ({ context }) => {
    const { user, weather } = context.inputData;
    
    return {
      message: `Hello ${user.name}, the weather is ${weather.condition} at ${weather.temperature}°F`,
    };
  },
});

const workflow = createWorkflow({
  id: "parallel-workflow",
  inputSchema: z.object({}),
  outputSchema: z.object({
    message: z.string(),
  }),
})
  .then(fetchUserData)
  .then(fetchWeatherData)
  .then(combineResults)
  .commit();

In this example, fetchUserData and fetchWeatherData will run simultaneously, and combineResults will wait for both to complete before executing.

2. Conditional Branching

Often, you need to determine the next step based on the outcome of a previous step:

import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";

// Step to analyze user input
const analyzeInput = createStep({
  id: "analyzeInput",
  inputSchema: z.object({
    userInput: z.string(),
  }),
  outputSchema: z.object({
    sentiment: z.enum(["positive", "negative", "neutral"]),
    topic: z.string(),
  }),
  execute: async ({ context }) => {
    const userInput = context.inputData.userInput.toLowerCase();
    
    // Very simple sentiment analysis for example purposes
    let sentiment: "positive" | "negative" | "neutral" = "neutral";
    if (userInput.includes("love") || userInput.includes("great")) {
      sentiment = "positive";
    } else if (userInput.includes("hate") || userInput.includes("bad")) {
      sentiment = "negative";
    }
    
    // Simple topic extraction
    let topic = "general";
    if (userInput.includes("weather")) {
      topic = "weather";
    } else if (userInput.includes("food")) {
      topic = "food";
    }
    
    return {
      sentiment,
      topic,
    };
  },
});

// Step for positive sentiment response
const positiveResponse = createStep({
  id: "positiveResponse",
  inputSchema: z.object({
    sentiment: z.enum(["positive", "negative", "neutral"]),
    topic: z.string(),
  }),
  outputSchema: z.object({
    response: z.string(),
  }),
  execute: async ({ context }) => {
    const { topic } = context.inputData;
    
    return {
      response: `I'm glad you feel positive about ${topic}!`,
    };
  },
});

// Step for negative sentiment response
const negativeResponse = createStep({
  id: "negativeResponse",
  inputSchema: z.object({
    sentiment: z.enum(["positive", "negative", "neutral"]),
    topic: z.string(),
  }),
  outputSchema: z.object({
    response: z.string(),
  }),
  execute: async ({ context }) => {
    const { topic } = context.inputData;
    
    return {
      response: `I'm sorry you feel negative about ${topic}.`,
    };
  },
});

// Step for neutral sentiment response
const neutralResponse = createStep({
  id: "neutralResponse",
  inputSchema: z.object({
    sentiment: z.enum(["positive", "negative", "neutral"]),
    topic: z.string(),
  }),
  outputSchema: z.object({
    response: z.string(),
  }),
  execute: async ({ context }) => {
    const { topic } = context.inputData;
    
    return {
      response: `Thanks for sharing your thoughts on ${topic}.`,
    };
  },
});

const workflow = createWorkflow({
  id: "conditional-workflow",
  inputSchema: z.object({
    userInput: z.string(),
  }),
  outputSchema: z.object({
    response: z.string(),
  }),
})
  .then(analyzeInput)
  .then(positiveResponse, {
    when: ({ context }) => {
      return context.inputData.sentiment === "positive";
    },
  })
  .then(negativeResponse, {
    when: ({ context }) => {
      return context.inputData.sentiment === "negative";
    },
  })
  .then(neutralResponse, {
    when: ({ context }) => {
      return context.inputData.sentiment === "neutral";
    },
  })
  .commit();

This workflow analyzes the sentiment of user input and then branches to different response steps based on the analysis.

3. Suspend and Resume

One of the most powerful features of Mastra workflows is the ability to suspend execution and resume it later. This is perfect for human-in-the-loop processes:

import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";

// Step to generate a summary
const summarizeStep = createStep({
  id: "summarizeStep",
  inputSchema: z.object({
    documentId: z.string(),
    content: z.string(),
  }),
  outputSchema: z.object({
    summary: z.string(),
  }),
  execute: async ({ context }) => {
    const { content } = context.inputData;
    
    // In a real application, this might use an LLM to generate a summary
    const summary = `Summary: ${content.substring(0, 100)}...`;
    
    return {
      summary,
    };
  },
});

// Step to request approval
const approvalStep = createStep({
  id: "approvalStep",
  inputSchema: z.object({
    summary: z.string(),
  }),
  outputSchema: z.object({
    approved: z.boolean(),
    feedback: z.string().optional(),
  }),
  execute: async ({ context, suspend }) => {
    // Suspend execution until we get human input
    await suspend();
    
    // This code will only run after the workflow is resumed with input data
    const approvalData = await context.getHumanInput();
    
    return {
      approved: approvalData.approved,
      feedback: approvalData.feedback,
    };
  },
});

// Final step based on approval
const finalizeStep = createStep({
  id: "finalizeStep",
  inputSchema: z.object({
    approved: z.boolean(),
    feedback: z.string().optional(),
  }),
  outputSchema: z.object({
    message: z.string(),
  }),
  execute: async ({ context }) => {
    const { approved, feedback } = context.inputData;
    
    if (approved) {
      return {
        message: "Document approved and finalized!",
      };
    } else {
      return {
        message: `Document rejected. Feedback: ${feedback || "No feedback provided"}`,
      };
    }
  },
});

const workflow = createWorkflow({
  id: "approval-workflow",
  inputSchema: z.object({
    documentId: z.string(),
    content: z.string(),
  }),
  outputSchema: z.object({
    message: z.string(),
  }),
})
  .then(summarizeStep)
  .then(approvalStep)
  .then(finalizeStep)
  .commit();

To run this workflow:

// Start the workflow
const run = await workflow.createRunAsync();
const initialResult = await run.start({
  inputData: {
    documentId: "doc123",
    content: "This is a sample document that needs approval...",
  },
});

console.log("Workflow suspended at:", initialResult.suspended);

// Later, when we get human input:
const resumedResult = await run.resume({
  stepId: "approvalStep",
  inputData: {
    approved: true,
    feedback: "Looks good!",
  },
});

console.log("Final result:", resumedResult);

This pattern is incredibly useful for applications that require human review, approval processes, or any workflow that can’t be completed in a single automated run.

Integrating LLMs in Workflows

Now, let’s look at how to integrate language models into your workflows. This is where Mastra really shines, as it provides a structured way to orchestrate LLM calls:

import { createWorkflow, createStep } from "@mastra/core/workflows";
import { Agent } from "@mastra/core/agent";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";

// Create an agent with OpenAI
const contentAgent = new Agent({
  name: "contentAgent",
  instructions: "You are a helpful assistant that creates content based on user requests.",
  model: openai("gpt-4o-mini"),
});

// Step to generate an outline
const outlineStep = createStep({
  id: "outlineStep",
  inputSchema: z.object({
    topic: z.string(),
    contentType: z.enum(["blog", "social", "email"]),
    tone: z.enum(["formal", "casual", "enthusiastic"]).optional(),
  }),
  outputSchema: z.object({
    outline: z.string(),
  }),
  execute: async ({ context, mastra }) => {
    const { topic, contentType, tone = "casual" } = context.inputData;
    
    const prompt = `Create an outline for a ${contentType} about "${topic}" in a ${tone} tone.`;
    const result = await contentAgent.generate(prompt);
    
    return {
      outline: result.text,
    };
  },
});

// Step to generate full content
const contentStep = createStep({
  id: "contentStep",
  inputSchema: z.object({
    topic: z.string(),
    contentType: z.enum(["blog", "social", "email"]),
    tone: z.enum(["formal", "casual", "enthusiastic"]).optional(),
    outline: z.string(),
  }),
  outputSchema: z.object({
    content: z.string(),
  }),
  execute: async ({ context, mastra }) => {
    const { topic, contentType, tone = "casual", outline } = context.inputData;
    
    const prompt = `Create full ${contentType} content about "${topic}" in a ${tone} tone. 
    Use this outline as a guide: ${outline}`;
    
    const result = await contentAgent.generate(prompt);
    
    return {
      content: result.text,
    };
  },
});

// Step to proofread and improve
const proofreadStep = createStep({
  id: "proofreadStep",
  inputSchema: z.object({
    content: z.string(),
  }),
  outputSchema: z.object({
    finalContent: z.string(),
  }),
  execute: async ({ context, mastra }) => {
    const { content } = context.inputData;
    
    const prompt = `Proofread and improve this content:
    
    ${content}
    
    Fix any grammar or spelling issues, improve flow, and ensure it's engaging.`;
    
    const result = await contentAgent.generate(prompt);
    
    return {
      finalContent: result.text,
    };
  },
});

// Workflow for content creation
const contentWorkflow = createWorkflow({
  id: "content-workflow",
  inputSchema: z.object({
    topic: z.string(),
    contentType: z.enum(["blog", "social", "email"]),
    tone: z.enum(["formal", "casual", "enthusiastic"]).optional(),
  }),
  outputSchema: z.object({
    finalContent: z.string(),
  }),
})
  .then(outlineStep)
  .then(contentStep)
  .then(proofreadStep)
  .commit();

// Register with Mastra
export const mastra = new Mastra({
  agents: {
    contentAgent,
  },
  workflows: {
    contentWorkflow,
  },
});

This workflow demonstrates how to break down a content creation process into logical steps with an LLM agent:

  1. Generate an outline
  2. Create full content based on the outline
  3. Proofread and improve the content

Each step builds on the previous one, creating a structured content creation pipeline.

Error Handling in Workflows

Robust error handling is crucial for production workflows. Mastra provides several patterns for handling errors:

import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";

// Step that might fail
const fetchUserDataStep = createStep({
  id: "fetchUserDataStep",
  inputSchema: z.object({
    userId: z.string(),
  }),
  outputSchema: z.object({
    user: z.object({
      id: z.string(),
      name: z.string(),
      email: z.string(),
    }),
  }),
  execute: async ({ context }) => {
    try {
      // Simulate an API call that might fail
      const userId = context.inputData.userId;
      
      // For demonstration, randomly simulate success or failure
      if (Math.random() < 0.5) {
        throw new Error("API error: User not found");
      }
      
      return {
        user: {
          id: userId,
          name: "Alice Smith",
          email: "[email protected]",
        },
      };
    } catch (error) {
      // Log the error for debugging
      console.error("Error fetching user data:", error);
      
      // Re-throw to mark the step as failed
      throw error;
    }
  },
});

// Success path
const processUserDataStep = createStep({
  id: "processUserDataStep",
  inputSchema: z.object({
    user: z.object({
      id: z.string(),
      name: z.string(),
      email: z.string(),
    }),
  }),
  outputSchema: z.object({
    message: z.string(),
  }),
  execute: async ({ context }) => {
    const { user } = context.inputData;
    
    return {
      message: `Hello ${user.name}, your data has been processed successfully!`,
    };
  },
});

// Fallback path for error recovery
const fallbackStep = createStep({
  id: "fallbackStep",
  inputSchema: z.object({
    error: z.string(),
  }),
  outputSchema: z.object({
    message: z.string(),
  }),
  execute: async ({ context }) => {
    return {
      message: `We couldn't process your data. Please try again later or contact support.`,
    };
  },
});

const workflow = createWorkflow({
  id: "error-handling-workflow",
  inputSchema: z.object({
    userId: z.string(),
  }),
  outputSchema: z.object({
    message: z.string(),
  }),
})
  .then(fetchUserDataStep)
  .then(processUserDataStep, {
    // Only follow this path if fetchUserDataStep succeeded
    when: ({ context }) => {
      return context.status === "success";
    },
  })
  .then(fallbackStep, {
    // Only follow this path if fetchUserDataStep failed
    when: ({ context }) => {
      return context.status === "failed";
    },
  })
  .commit();

This workflow demonstrates how to handle errors by:

  1. Using try/catch blocks within step execution
  2. Conditional branching based on step status
  3. Implementing fallback steps for error recovery

You can also monitor workflow runs for errors:

const run = await workflow.createRunAsync();

run.watch(async (event) => {
  // Check for any failed steps
  if (event.type === 'step.failed') {
    console.error(`Step ${event.stepId} failed:`, event.error);
    // Take remedial action, such as alerting or logging
  }
});

await run.start({
  inputData: {
    userId: "user123",
  },
});

Building a Multi-Agent Workflow

For complex AI applications, you often need multiple specialized agents working together. Mastra makes this easy with hierarchical multi-agent workflows:

import { createWorkflow, createStep } from "@mastra/core/workflows";
import { Agent } from "@mastra/core/agent";
import { createTool } from "@mastra/core/tools";
import { openai } from "@ai-sdk/openai";
import { anthropic } from "@ai-sdk/anthropic";
import { z } from "zod";

// Create specialized agents
const researchAgent = new Agent({
  name: "ResearchAgent",
  instructions: "You are a research agent that collects and organizes information on given topics.",
  model: openai("gpt-4o-mini"),
});

const writingAgent = new Agent({
  name: "WritingAgent",
  instructions: "You are a writing agent that creates well-structured, engaging content.",
  model: anthropic("claude-3-5-sonnet-20241022"),
});

const editingAgent = new Agent({
  name: "EditingAgent",
  instructions: "You are an editing agent that improves writing style, grammar, and readability.",
  model: openai("gpt-4o-mini"),
});

// Create tools to access these agents from a workflow
const researchTool = createTool({
  id: "research-tool",
  description: "Research a given topic and return organized findings",
  inputSchema: z.object({
    topic: z.string().describe("The topic to research"),
  }),
  outputSchema: z.object({
    findings: z.string(),
  }),
  execute: async ({ context }) => {
    const prompt = `Research the following topic and organize your findings:
    
    TOPIC: ${context.topic}
    
    Include key points, important facts, and any relevant controversies or debates.`;
    
    const result = await researchAgent.generate(prompt);
    
    return {
      findings: result.text,
    };
  },
});

const writingTool = createTool({
  id: "writing-tool",
  description: "Create written content based on research",
  inputSchema: z.object({
    topic: z.string().describe("The main topic"),
    findings: z.string().describe("Research findings to base the writing on"),
    contentType: z.enum(["article", "blog", "report"]).describe("Type of content to create"),
  }),
  outputSchema: z.object({
    draft: z.string(),
  }),
  execute: async ({ context }) => {
    const prompt = `Create a ${context.contentType} on the following topic:
    
    TOPIC: ${context.topic}
    
    BASE YOUR WRITING ON THESE RESEARCH FINDINGS:
    ${context.findings}
    
    Create engaging, well-structured content that effectively communicates the key points.`;
    
    const result = await writingAgent.generate(prompt);
    
    return {
      draft: result.text,
    };
  },
});

const editingTool = createTool({
  id: "editing-tool",
  description: "Edit and improve written content",
  inputSchema: z.object({
    draft: z.string().describe("The draft content to edit"),
    target: z.enum(["general", "academic", "professional"]).describe("Target audience"),
  }),
  outputSchema: z.object({
    final: z.string(),
  }),
  execute: async ({ context }) => {
    const prompt = `Edit and improve the following content for a ${context.target} audience:
    
    ${context.draft}
    
    Improve clarity, flow, grammar, and style. Ensure the content is polished and professional.`;
    
    const result = await editingAgent.generate(prompt);
    
    return {
      final: result.text,
    };
  },
});

// Research step
const researchStep = createStep({
  id: "researchStep",
  inputSchema: z.object({
    topic: z.string(),
    contentType: z.enum(["article", "blog", "report"]),
    audience: z.enum(["general", "academic", "professional"]),
  }),
  outputSchema: z.object({
    findings: z.string(),
  }),
  execute: async ({ context }) => {
    const { topic } = context.inputData;
    
    const result = await researchTool.execute({
      context: {
        topic,
      },
    });
    
    return {
      findings: result.findings,
    };
  },
});

// Writing step
const writingStep = createStep({
  id: "writingStep",
  inputSchema: z.object({
    topic: z.string(),
    contentType: z.enum(["article", "blog", "report"]),
    audience: z.enum(["general", "academic", "professional"]),
    findings: z.string(),
  }),
  outputSchema: z.object({
    draft: z.string(),
  }),
  execute: async ({ context }) => {
    const { topic, contentType, findings } = context.inputData;
    
    const result = await writingTool.execute({
      context: {
        topic,
        findings,
        contentType,
      },
    });
    
    return {
      draft: result.draft,
    };
  },
});

// Editing step
const editingStep = createStep({
  id: "editingStep",
  inputSchema: z.object({
    audience: z.enum(["general", "academic", "professional"]),
    draft: z.string(),
  }),
  outputSchema: z.object({
    final: z.string(),
  }),
  execute: async ({ context }) => {
    const { audience, draft } = context.inputData;
    
    const result = await editingTool.execute({
      context: {
        draft,
        target: audience,
      },
    });
    
    return {
      final: result.final,
    };
  },
});

// Create a content production workflow
const contentWorkflow = createWorkflow({
  id: "content-production",
  inputSchema: z.object({
    topic: z.string(),
    contentType: z.enum(["article", "blog", "report"]),
    audience: z.enum(["general", "academic", "professional"]),
  }),
  outputSchema: z.object({
    final: z.string(),
  }),
})
  .then(researchStep)
  .then(writingStep)
  .then(editingStep)
  .commit();

// Register with Mastra
export const mastra = new Mastra({
  agents: {
    ResearchAgent: researchAgent,
    WritingAgent: writingAgent,
    EditingAgent: editingAgent,
  },
  workflows: {
    contentWorkflow,
  },
});

This example demonstrates a content production pipeline with specialized agents for research, writing, and editing. The workflow coordinates these agents, creating a powerful multi-agent system that leverages the strengths of each specialized component.

Testing and Debugging Workflows

One of the most powerful features of Mastra is its local development environment, which makes testing and debugging workflows much easier.

To start the local development server:

npx mastra dev

This opens a UI (typically at http://localhost:4111) where you can:

  1. Visualize your workflow graph
  2. Test your workflow with different inputs
  3. Inspect step inputs and outputs
  4. View execution logs
  5. Debug suspended workflows
  6. Monitor step status (success, failure, suspended)

For programmatic testing, you can also create tests:

import { mastra } from "./your-workflow-file";

describe("Content Workflow", () => {
  it("should generate content successfully", async () => {
    const workflow = mastra.getWorkflow("contentWorkflow");
    const run = await workflow.createRunAsync();
    
    const result = await run.start({
      inputData: {
        topic: "Artificial Intelligence Ethics",
        contentType: "article",
        audience: "general",
      },
    });
    
    expect(result.status).toBe("success");
    expect(result.steps.researchStep.status).toBe("success");
    expect(result.steps.writingStep.status).toBe("success");
    expect(result.steps.editingStep.status).toBe("success");
    expect(result.result.final).toBeTruthy();
  });
  
  it("should handle errors gracefully", async () => {
    // Mock a failing step for testing
    const workflow = mastra.getWorkflow("contentWorkflow");
    
    // Replace with a failing implementation
    const originalStep = workflow.getStep("researchStep");
    workflow.replaceStep("researchStep", {
      ...originalStep,
      execute: async () => {
        throw new Error("Simulated failure");
      },
    });
    
    const run = await workflow.createRunAsync();
    
    const result = await run.start({
      inputData: {
        topic: "Artificial Intelligence Ethics",
        contentType: "article",
        audience: "general",
      },
    });
    
    expect(result.status).toBe("failed");
    expect(result.steps.researchStep.status).toBe("failed");
    
    // Restore original implementation
    workflow.replaceStep("researchStep", originalStep);
  });
});

Deploying Workflows to Production

Mastra makes it easy to deploy your workflows to various environments:

Serverless Deployment

For platforms like Vercel, Cloudflare Workers, or Netlify:

// api/workflows/[name].ts
import { mastra } from "../../path-to-your-workflow";
import { NextRequest, NextResponse } from "next/server";

export async function POST(
  request: NextRequest,
  { params }: { params: { name: string } }
) {
  try {
    const workflowName = params.name;
    const workflow = mastra.getWorkflow(workflowName);
    
    if (!workflow) {
      return NextResponse.json(
        { error: `Workflow '${workflowName}' not found` },
        { status: 404 }
      );
    }
    
    const body = await request.json();
    const run = await workflow.createRunAsync();
    
    const result = await run.start({
      inputData: body,
    });
    
    return NextResponse.json(result);
  } catch (error) {
    console.error("Workflow error:", error);
    return NextResponse.json(
      { error: "Failed to execute workflow" },
      { status: 500 }
    );
  }
}

Node.js Server

For a standalone Node.js server:

import { Hono } from "hono";
import { serve } from "@hono/node-server";
import { mastra } from "./path-to-your-workflow";

const app = new Hono();

// Endpoint for workflow execution
app.post("/api/workflows/:name", async (c) => {
  try {
    const workflowName = c.req.param("name");
    const workflow = mastra.getWorkflow(workflowName);
    
    if (!workflow) {
      return c.json(
        { error: `Workflow '${workflowName}' not found` },
        404
      );
    }
    
    const body = await c.req.json();
    const run = await workflow.createRunAsync();
    
    const result = await run.start({
      inputData: body,
    });
    
    return c.json(result);
  } catch (error) {
    console.error("Workflow error:", error);
    return c.json(
      { error: "Failed to execute workflow" },
      500
    );
  }
});

// Endpoint for resuming suspended workflows
app.post("/api/workflows/:name/resume", async (c) => {
  try {
    const workflowName = c.req.param("name");
    const workflow = mastra.getWorkflow(workflowName);
    
    if (!workflow) {
      return c.json(
        { error: `Workflow '${workflowName}' not found` },
        404
      );
    }
    
    const { runId, stepId, inputData } = await c.req.json();
    
    const run = await workflow.getRunAsync(runId);
    const result = await run.resume({
      stepId,
      inputData,
    });
    
    return c.json(result);
  } catch (error) {
    console.error("Resume error:", error);
    return c.json(
      { error: "Failed to resume workflow" },
      500
    );
  }
});

const port = process.env.PORT || 3000;
console.log(`Server is running on port ${port}`);

serve({
  fetch: app.fetch,
  port: Number(port),
});

Simplified Deployment with Mastra CLI

Mastra provides a deployment helper that simplifies the process:

npx mastra deploy

This command will bundle your agents and workflows and deploy them to your configured platform.

Real-World Examples and Use Cases

Now that we’ve covered the fundamentals and advanced patterns of Mastra workflows, let’s look at some real-world applications:

1. Customer Support Automation

Workflows are perfect for customer support flows that combine automation with human intervention:

import { createWorkflow, createStep } from "@mastra/core/workflows";
import { Agent } from "@mastra/core/agent";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";

// Create an agent for support tasks
const supportAgent = new Agent({
  name: "SupportAgent",
  instructions: "You are a helpful customer support agent. Your goal is to understand customer issues and provide clear, concise solutions.",
  model: openai("gpt-4o-mini"),
});

// Step 1: Analyze customer query
const analyzeQueryStep = createStep({
  id: "analyzeQueryStep",
  inputSchema: z.object({
    customerId: z.string(),
    query: z.string(),
    email: z.string().email(),
  }),
  outputSchema: z.object({
    category: z.enum([
      "billing", 
      "technical", 
      "account", 
      "product", 
      "other"
    ]),
    urgency: z.enum(["low", "medium", "high"]),
    sentiment: z.enum(["positive", "neutral", "negative"]),
    key_points: z.array(z.string()),
  }),
  execute: async ({ context }) => {
    const { query } = context.inputData;
    
    // Prompt the agent to analyze the query
    const prompt = `
      Analyze the following customer support query:
      
      "${query}"
      
      Determine:
      1. Category (billing, technical, account, product, or other)
      2. Urgency level (low, medium, high)
      3. Customer sentiment (positive, neutral, negative)
      4. Key points (extract the main issues as a list)
      
      Format your response as JSON with keys: category, urgency, sentiment, and key_points (array).
    `;
    
    const result = await supportAgent.generate(prompt);
    
    // Parse the response as JSON
    try {
      const analysis = JSON.parse(result.text);
      return {
        category: analysis.category,
        urgency: analysis.urgency,
        sentiment: analysis.sentiment,
        key_points: analysis.key_points,
      };
    } catch (error) {
      console.error("Failed to parse analysis:", error);
      // Fallback to default values if parsing fails
      return {
        category: "other",
        urgency: "medium",
        sentiment: "neutral",
        key_points: [query],
      };
    }
  },
});

// Step 2: Search knowledge base for solutions
const searchKnowledgeBaseStep = createStep({
  id: "searchKnowledgeBaseStep",
  inputSchema: z.object({
    query: z.string(),
    category: z.enum(["billing", "technical", "account", "product", "other"]),
    key_points: z.array(z.string()),
  }),
  outputSchema: z.object({
    articles: z.array(
      z.object({
        id: z.string(),
        title: z.string(),
        summary: z.string(),
        relevance: z.number(),
        url: z.string(),
      })
    ),
    has_relevant_articles: z.boolean(),
  }),
  execute: async ({ context }) => {
    const { query, category, key_points } = context.inputData;
    
    // In a real implementation, this would call your knowledge base API
    // For this example, we'll simulate the response
    
    // Create a search query based on the category and key points
    const searchTerms = [
      category,
      ...key_points,
    ].join(" ");
    
    console.log(`Searching knowledge base for: ${searchTerms}`);
    
    // Simulate API call to knowledge base
    await new Promise(resolve => setTimeout(resolve, 1000));
    
    // Simulate a response with some relevant articles
    // In a real implementation, you would fetch actual results
    const mockArticles = [
      {
        id: "kb-123",
        title: `How to resolve ${category} issues`,
        summary: `Common solutions for ${category} problems.`,
        relevance: 0.92,
        url: `/help/${category}/overview`,
      },
      {
        id: "kb-456",
        title: `Troubleshooting guide for ${key_points[0] || "common issues"}`,
        summary: "Step-by-step guide to diagnose and fix problems.",
        relevance: 0.85,
        url: "/help/troubleshooting",
      },
      {
        id: "kb-789",
        title: "Frequently Asked Questions",
        summary: "Answers to common customer questions.",
        relevance: 0.70,
        url: "/help/faq",
      },
    ];
    
    // Determine if we have any highly relevant articles
    const hasRelevantArticles = mockArticles.some(a => a.relevance > 0.8);
    
    return {
      articles: mockArticles,
      has_relevant_articles: hasRelevantArticles,
    };
  },
});

// Step 3: Generate a response or escalate to human
const generateResponseStep = createStep({
  id: "generateResponseStep",
  inputSchema: z.object({
    query: z.string(),
    customerId: z.string(),
    category: z.enum(["billing", "technical", "account", "product", "other"]),
    urgency: z.enum(["low", "medium", "high"]),
    sentiment: z.enum(["positive", "neutral", "negative"]),
    articles: z.array(
      z.object({
        id: z.string(),
        title: z.string(),
        summary: z.string(),
        relevance: z.number(),
        url: z.string(),
      })
    ),
    has_relevant_articles: z.boolean(),
  }),
  outputSchema: z.object({
    response: z.string(),
    requires_human: z.boolean(),
    ticket_id: z.string().optional(),
  }),
  execute: async ({ context, suspend }) => {
    const { 
      query, 
      customerId, 
      category, 
      urgency, 
      sentiment, 
      articles, 
      has_relevant_articles 
    } = context.inputData;
    
    // Check if this needs human escalation based on criteria
    const needsHuman = 
      urgency === "high" || 
      sentiment === "negative" || 
      !has_relevant_articles ||
      category === "billing"; // Billing issues always go to humans
    
    if (needsHuman) {
      // Create a ticket in your ticketing system
      // In a real implementation, this would call your ticket API
      console.log(`Creating support ticket for customer ${customerId}`);
      await new Promise(resolve => setTimeout(resolve, 500));
      
      // Generate a ticket ID
      const ticketId = `TKT-${Date.now().toString().substring(5)}`;
      
      // Suspend the workflow until human input is received
      await suspend();
      
      // When resumed, we'll have human_input
      const humanInput = await context.getHumanInput();
      
      return {
        response: humanInput.response || "No response provided",
        requires_human: true,
        ticket_id: ticketId,
      };
    }
    
    // If we don't need human intervention, generate an AI response
    
    // Build a prompt that includes knowledge base articles
    let articlesContext = "";
    if (articles.length > 0) {
      articlesContext = "Here are some relevant help articles:\n\n" + 
        articles.map(article => 
          `Title: ${article.title}\nSummary: ${article.summary}\nURL: ${article.url}`
        ).join("\n\n");
    }
    
    const prompt = `
      You are responding to a customer support query.
      
      Customer query: "${query}"
      
      Category: ${category}
      Urgency: ${urgency}
      Sentiment: ${sentiment}
      
      ${articlesContext}
      
      Write a helpful, friendly response that addresses the customer's issue.
      Include links to relevant help articles if available.
      Keep the tone professional but warm.
      Sign off as "Customer Support Team".
    `;
    
    const result = await supportAgent.generate(prompt);
    
    return {
      response: result.text,
      requires_human: false,
    };
  },
});

// Step 4: Send email to customer
const sendEmailStep = createStep({
  id: "sendEmailStep",
  inputSchema: z.object({
    email: z.string().email(),
    customerId: z.string(),
    response: z.string(),
    requires_human: z.boolean(),
    ticket_id: z.string().optional(),
  }),
  outputSchema: z.object({
    email_sent: z.boolean(),
    message_id: z.string().optional(),
  }),
  execute: async ({ context }) => {
    const { email, customerId, response, requires_human, ticket_id } = context.inputData;
    
    // Prepare email content
    let subject = "Your Support Request";
    let ticketReference = "";
    
    if (requires_human && ticket_id) {
      subject = `Your Support Request (Ticket #${ticket_id})`;
      ticketReference = `\n\nFor future reference, your ticket number is: ${ticket_id}`;
    }
    
    const emailBody = `
      Dear Customer,
      
      ${response}
      ${ticketReference}
      
      Thank you for contacting us.
      
      Customer Support Team
    `;
    
    // In a real implementation, this would call your email API
    console.log(`Sending email to ${email}`);
    console.log(`Subject: ${subject}`);
    console.log(`Body: ${emailBody}`);
    
    // Simulate API call to email service
    await new Promise(resolve => setTimeout(resolve, 1000));
    
    // Simulate a message ID returned by the email service
    const messageId = `msg_${Date.now().toString(36)}`;
    
    return {
      email_sent: true,
      message_id: messageId,
    };
  },
});

// Step 5: Update CRM with interaction details
const updateCrmStep = createStep({
  id: "updateCrmStep",
  inputSchema: z.object({
    customerId: z.string(),
    query: z.string(),
    category: z.enum(["billing", "technical", "account", "product", "other"]),
    urgency: z.enum(["low", "medium", "high"]),
    sentiment: z.enum(["positive", "neutral", "negative"]),
    response: z.string(),
    requires_human: z.boolean(),
    ticket_id: z.string().optional(),
    email_sent: z.boolean(),
    message_id: z.string().optional(),
  }),
  outputSchema: z.object({
    crm_updated: z.boolean(),
    interaction_id: z.string(),
  }),
  execute: async ({ context }) => {
    const { 
      customerId, 
      query, 
      category, 
      urgency, 
      sentiment, 
      response, 
      requires_human, 
      ticket_id, 
      email_sent, 
      message_id 
    } = context.inputData;
    
    // Prepare CRM update data
    const crmData = {
      customer_id: customerId,
      timestamp: new Date().toISOString(),
      interaction_type: "support_request",
      category,
      urgency,
      sentiment,
      query,
      response,
      handled_by: requires_human ? "human" : "ai",
      ticket_id,
      email_sent,
      email_message_id: message_id,
    };
    
    // In a real implementation, this would call your CRM API
    console.log(`Updating CRM for customer ${customerId}`);
    console.log("CRM data:", crmData);
    
    // Simulate API call to CRM
    await new Promise(resolve => setTimeout(resolve, 800));
    
    // Simulate an interaction ID returned by the CRM
    const interactionId = `int_${Date.now().toString(36)}`;
    
    return {
      crm_updated: true,
      interaction_id: interactionId,
    };
  },
});

// Create and configure the workflow
const supportWorkflow = createWorkflow({
  id: "support-workflow",
  inputSchema: z.object({
    customerId: z.string(),
    query: z.string(),
    email: z.string().email(),
  }),
  outputSchema: z.object({
    crm_updated: z.boolean(),
    interaction_id: z.string(),
  }),
})
  .then(analyzeQueryStep)
  .then(searchKnowledgeBaseStep)
  .then(generateResponseStep)
  .then(sendEmailStep)
  .then(updateCrmStep)
  .commit();

2. Content Moderation

Workflows can orchestrate sophisticated content moderation pipelines:

import { createWorkflow, createStep } from "@mastra/core/workflows";
import { Agent } from "@mastra/core/agent";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";
import { createTool } from "@mastra/core/tools";

// Create an agent for moderation tasks
const moderationAgent = new Agent({
  name: "ModerationAgent",
  instructions: "You are a content moderation assistant. Your job is to identify potentially problematic content such as hate speech, harassment, self-harm, sexual content, violence, or illegal activities.",
  model: openai("gpt-4o-mini"),
});

// Create a tool for toxicity classification using OpenAI's moderation API
const toxicityTool = createTool({
  id: "toxicity-analyzer",
  description: "Analyze text for potentially harmful content using OpenAI's moderation API",
  inputSchema: z.object({
    text: z.string().describe("The text to analyze for harmful content"),
  }),
  outputSchema: z.object({
    flagged: z.boolean(),
    categories: z.object({
      hate: z.boolean(),
      harassment: z.boolean(),
      self_harm: z.boolean(),
      sexual: z.boolean(),
      violence: z.boolean(),
      illegal: z.boolean(),
    }),
    category_scores: z.object({
      hate: z.number(),
      harassment: z.number(),
      self_harm: z.number(),
      sexual: z.number(),
      violence: z.number(),
      illegal: z.number(),
    }),
  }),
  execute: async ({ context }) => {
    // In a real implementation, you'd call OpenAI's moderation API
    // Here we'll simulate a response
    
    const text = context.text.toLowerCase();
    
    // Simple keyword-based check for demonstration
    const hateWords = ["hate", "awful", "terrible", "stupid"];
    const harassmentWords = ["loser", "idiot", "stupid"];
    const selfHarmWords = ["kill myself", "suicide", "end it all"];
    const sexualWords = ["sex", "pornography", "naked"];
    const violenceWords = ["kill", "attack", "hurt", "fight"];
    const illegalWords = ["illegal", "drugs", "hack", "steal"];
    
    // Calculate simple scores based on keyword presence
    const getScore = (words: string[]) => {
      return words.some(word => text.includes(word)) ? 
        Math.random() * 0.5 + 0.5 : // Random high score if keyword found
        Math.random() * 0.3; // Random low score otherwise
    };
    
    const hateScore = getScore(hateWords);
    const harassmentScore = getScore(harassmentWords);
    const selfHarmScore = getScore(selfHarmWords);
    const sexualScore = getScore(sexualWords);
    const violenceScore = getScore(violenceWords);
    const illegalScore = getScore(illegalWords);
    
    // Determine if content should be flagged (any score > 0.7)
    const flagged = 
      hateScore > 0.7 || 
      harassmentScore > 0.7 || 
      selfHarmScore > 0.7 || 
      sexualScore > 0.7 || 
      violenceScore > 0.7 || 
      illegalScore > 0.7;
    
    return {
      flagged,
      categories: {
        hate: hateScore > 0.7,
        harassment: harassmentScore > 0.7,
        self_harm: selfHarmScore > 0.7,
        sexual: sexualScore > 0.7,
        violence: violenceScore > 0.7,
        illegal: illegalScore > 0.7,
      },
      category_scores: {
        hate: hateScore,
        harassment: harassmentScore,
        self_harm: selfHarmScore,
        sexual: sexualScore,
        violence: violenceScore,
        illegal: illegalScore,
      },
    };
  },
});

// Step 1: Check against explicit word list
const explicitWordCheckStep = createStep({
  id: "explicitWordCheckStep",
  inputSchema: z.object({
    contentId: z.string(),
    content: z.string(),
    contentType: z.enum(["text", "image", "video"]),
    userId: z.string(),
  }),
  outputSchema: z.object({
    contains_explicit_words: z.boolean(),
    flagged_words: z.array(z.string()),
    passed: z.boolean(),
  }),
  execute: async ({ context }) => {
    const { content } = context.inputData;
    
    // List of explicit words to check against
    // This would be much more comprehensive in a real system
    const explicitWords = [
      "explicit1", "explicit2", "profanity1", "profanity2", "slur1", "slur2"
    ];
    
    // Check if content contains any explicit words
    const lowerContent = content.toLowerCase();
    const flaggedWords = explicitWords.filter(word => 
      lowerContent.includes(word.toLowerCase())
    );
    
    const containsExplicitWords = flaggedWords.length > 0;
    
    // Automatic rejection if explicit words are found
    const passed = !containsExplicitWords;
    
    return {
      contains_explicit_words: containsExplicitWords,
      flagged_words: flaggedWords,
      passed,
    };
  },
});

// Step 2: Run toxicity analysis with LLM
const toxicityAnalysisStep = createStep({
  id: "toxicityAnalysisStep",
  inputSchema: z.object({
    content: z.string(),
    passed: z.boolean(),
  }),
  outputSchema: z.object({
    toxicity_results: z.object({
      flagged: z.boolean(),
      categories: z.object({
        hate: z.boolean(),
        harassment: z.boolean(),
        self_harm: z.boolean(),
        sexual: z.boolean(),
        violence: z.boolean(),
        illegal: z.boolean(),
      }),
      category_scores: z.object({
        hate: z.number(),
        harassment: z.number(),
        self_harm: z.number(),
        sexual: z.number(),
        violence: z.number(),
        illegal: z.number(),
      }),
    }),
    passed: z.boolean(),
    moderation_score: z.number(),
  }),
  execute: async ({ context }) => {
    // Skip if the content already failed explicit word check
    const { passed, content } = context.inputData;
    
    if (!passed) {
      // Content already failed, no need for further analysis
      return {
        toxicity_results: {
          flagged: true,
          categories: {
            hate: false,
            harassment: false,
            self_harm: false,
            sexual: false,
            violence: false,
            illegal: false,
          },
          category_scores: {
            hate: 0,
            harassment: 0,
            self_harm: 0,
            sexual: 0,
            violence: 0,
            illegal: 0,
          },
        },
        passed: false,
        moderation_score: 0, // Lowest possible score
      };
    }
    
    // Call toxicity analysis tool
    const toxicityResults = await toxicityTool.execute({
      context: {
        text: content,
      },
    });
    
    // Calculate an overall moderation score (0-1, higher is better/safer)
    // This is a simple average of (1 - category scores)
    const scores = Object.values(toxicityResults.category_scores);
    const averageToxicity = scores.reduce((sum, score) => sum + score, 0) / scores.length;
    const moderationScore = 1 - averageToxicity;
    
    // Determine if content passes based on moderation score
    // Scores below 0.7 are flagged for human review
    const passedAnalysis = !toxicityResults.flagged && moderationScore >= 0.7;
    
    return {
      toxicity_results: toxicityResults,
      passed: passedAnalysis,
      moderation_score: moderationScore,
    };
  },
});

// Step 3: Branch based on moderation score
const contentDecisionStep = createStep({
  id: "contentDecisionStep",
  inputSchema: z.object({
    contains_explicit_words: z.boolean(),
    flagged_words: z.array(z.string()),
    toxicity_results: z.object({
      flagged: z.boolean(),
      categories: z.record(z.boolean()),
      category_scores: z.record(z.number()),
    }),
    passed: z.boolean(),
    moderation_score: z.number(),
  }),
  outputSchema: z.object({
    decision: z.enum(["approve", "reject", "review"]),
    reason: z.string(),
    confidence: z.number(),
  }),
  execute: async ({ context }) => {
    const { 
      contains_explicit_words, 
      flagged_words, 
      toxicity_results, 
      moderation_score 
    } = context.inputData;
    
    // Automatic rejection if explicit words were found
    if (contains_explicit_words) {
      return {
        decision: "reject",
        reason: `Content contains explicit words: ${flagged_words.join(", ")}`,
        confidence: 1.0, // High confidence
      };
    }
    
    // Determine decision based on toxicity results
    if (toxicity_results.flagged) {
      // Get the categories that were flagged
      const flaggedCategories = Object.entries(toxicity_results.categories)
        .filter(([_, flagged]) => flagged)
        .map(([category]) => category);
      
      return {
        decision: "reject",
        reason: `Content flagged for: ${flaggedCategories.join(", ")}`,
        confidence: 0.9, // High confidence
      };
    }
    
    // If moderation score is high enough, approve
    if (moderation_score >= 0.8) {
      return {
        decision: "approve",
        reason: "Content passed all moderation checks",
        confidence: moderation_score,
      };
    }
    
    // Otherwise, send for human review
    return {
      decision: "review",
      reason: `Moderation score (${moderation_score.toFixed(2)}) is below automatic approval threshold`,
      confidence: 1 - moderation_score, // Confidence in needing review
    };
  },
});

// Step 4: Human review for borderline cases
const humanReviewStep = createStep({
  id: "humanReviewStep",
  inputSchema: z.object({
    contentId: z.string(),
    content: z.string(),
    contentType: z.enum(["text", "image", "video"]),
    userId: z.string(),
    decision: z.enum(["approve", "reject", "review"]),
  }),
  outputSchema: z.object({
    decision: z.enum(["approve", "reject"]),
    reason: z.string(),
    reviewer_id: z.string().optional(),
    review_notes: z.string().optional(),
  }),
  execute: async ({ context, suspend }) => {
    const { decision, contentId, content, contentType, userId } = context.inputData;
    
    // If the decision isn't to review, this step should be skipped
    if (decision !== "review") {
      return {
        decision: decision as "approve" | "reject",
        reason: "Automatic decision, no human review needed",
      };
    }
    
    // In a real implementation, you would:
    // 1. Add the content to a review queue
    // 2. Notify moderators
    // 3. Provide a UI for moderators to review and make decisions
    
    console.log(`Content ${contentId} sent for human review`);
    console.log(`Content type: ${contentType}`);
    console.log(`Content: ${content.substring(0, 100)}...`);
    
    // Suspend the workflow until human review is completed
    await suspend();
    
    // When the workflow is resumed, we'll have the human decision
    const humanInput = await context.getHumanInput();
    
    return {
      decision: humanInput.review_decision || "reject", // Default to reject if missing
      reason: "Human review decision",
      reviewer_id: "mod_123", // In a real system, this would be the actual moderator ID
      review_notes: humanInput.review_notes,
    };
  },
});

// Step 5: Apply moderation decision
const applyModerationStep = createStep({
  id: "applyModerationStep",
  inputSchema: z.object({
    contentId: z.string(),
    userId: z.string(),
    decision: z.enum(["approve", "reject"]),
    reason: z.string(),
    reviewer_id: z.string().optional(),
    review_notes: z.string().optional(),
  }),
  outputSchema: z.object({
    content_id: z.string(),
    user_id: z.string(),
    final_decision: z.enum(["approve", "reject"]),
    action_taken: z.string(),
    notification_sent: z.boolean(),
  }),
  execute: async ({ context }) => {
    const { 
      contentId, 
      userId, 
      decision, 
      reason, 
      reviewer_id, 
      review_notes 
    } = context.inputData;
    
    // Determine the final decision and action taken
    const finalDecision = decision;
    let actionTaken = `Content ${finalDecision}ed. Reason: ${reason}`;
    
    if (reviewer_id) {
      actionTaken = `Human moderator (${reviewer_id}) ${finalDecision}ed the content. Reason: ${reason}`;
      if (review_notes) {
        actionTaken += ` Notes: ${review_notes}`;
      }
    }
    
    // In a real implementation, you would:
    // 1. Update the content status in your database
    // 2. Make the content visible or hide it based on the decision
    // 3. Apply any necessary tags or flags
    
    console.log(`Applying moderation decision for content ${contentId}`);
    console.log(`Decision: ${finalDecision}`);
    console.log(`Action: ${actionTaken}`);
    
    // Simulate API call to update content status
    await new Promise(resolve => setTimeout(resolve, 500));
    
    // Simulate notification to the user
    let notificationSent = false;
    
    if (finalDecision === "reject") {
      console.log(`Sending rejection notification to user ${userId}`);
      // Simulate API call to send notification
      await new Promise(resolve => setTimeout(resolve, 300));
      notificationSent = true;
    }
    
    return {
      content_id: contentId,
      user_id: userId,
      final_decision: finalDecision,
      action_taken: actionTaken,
      notification_sent: notificationSent,
    };
  },
});

// Create the moderation workflow
const moderationWorkflow = createWorkflow({
  id: "moderation-workflow",
  inputSchema: z.object({
    contentId: z.string(),
    content: z.string(),
    contentType: z.enum(["text", "image", "video"]),
    userId: z.string(),
  }),
  outputSchema: z.object({
    content_id: z.string(),
    user_id: z.string(),
    final_decision: z.enum(["approve", "reject"]),
    action_taken: z.string(),
    notification_sent: z.boolean(),
  }),
})
  .then(explicitWordCheckStep)
  .then(toxicityAnalysisStep)
  .then(contentDecisionStep)
  .then(humanReviewStep, {
    // Only go to human review if the decision is "review"
    when: ({ context }) => {
      return context.inputData.decision === "review";
    },
  })
  .then(applyModerationStep)
  .commit();

Performance Considerations

As your Mastra workflows become more complex, consider these performance optimizations:

  1. Parallel Execution: Use parallel steps whenever possible for operations that don’t depend on each other.

  2. Caching: Implement caching for expensive operations, especially LLM calls:

import NodeCache from "node-cache";

// Simple cache with 30-minute TTL
const llmCache = new NodeCache({ stdTTL: 1800 });

// Example of cached LLM call
async function cachedLlmCall(prompt: string, model: string) {
  // Create a cache key based on prompt and model
  const cacheKey = `${model}:${prompt}`;
  
  // Check if we have a cached result
  const cachedResult = llmCache.get(cacheKey);
  if (cachedResult) {
    console.log("Cache hit for LLM call");
    return cachedResult;
  }
  
  // If not cached, make the actual call
  console.log("Cache miss for LLM call");
  let result;
  
  if (model.startsWith("gpt")) {
    result = await openai(model).generate(prompt);
  } else if (model.startsWith("claude")) {
    result = await anthropic(model).generate(prompt);
  } else {
    throw new Error(`Unsupported model: ${model}`);
  }
  
  // Cache the result
  llmCache.set(cacheKey, result);
  
  return result;
}
  1. Step Timeouts: Add timeouts to steps that might take too long:
// Example of a step with timeout
const timeoutStep = createStep({
  id: "timeoutStep",
  inputSchema: z.object({}),
  outputSchema: z.object({
    result: z.string(),
  }),
  execute: async ({ context }) => {
    return await Promise.race([
      yourActualFunction(),
      new Promise((_, reject) => 
        setTimeout(() => reject(new Error("Step timed out")), 30000)
      ),
    ]);
  },
});
  1. Resource Cleanup: Ensure resources are properly cleaned up, especially in error cases:
const cleanupStep = createStep({
  id: "cleanupStep",
  inputSchema: z.object({}),
  outputSchema: z.object({
    result: z.string(),
  }),
  execute: async ({ context }) => {
    try {
      // Your step logic here
      return { result: "success" };
    } finally {
      // Cleanup logic that runs even if there's an error
      await releaseResources();
    }
  },
});

Monitoring and Observability

For production workflows, monitoring is crucial. Mastra integrates with observability tools:

import { Mastra } from "@mastra/core";
import { PinoLogger } from "@mastra/loggers";

// Initialize Mastra with telemetry
const mastra = new Mastra({
  workflows: {
    // Your workflows here
  },
  telemetry: {
    serviceName: "my-mastra-app",
    enabled: true,
    sampling: {
      type: "ratio",
      probability: 0.5,
    },
    export: {
      type: "otlp",
      endpoint: process.env.OTEL_EXPORTER_OTLP_ENDPOINT,
    },
  },
  logger: new PinoLogger({
    name: "MyApp",
    level: "info",
  }),
});

This setup will send workflow step metrics to your observability platform, allowing you to:

  • Track workflow completion rates
  • Monitor step execution times
  • Set up alerts for failed workflows
  • Visualize workflow performance over time

What’s Next?

Mastra workflows provide a powerful, type-safe way to orchestrate complex AI operations. By breaking down complex processes into discrete steps with clear inputs, outputs, and execution logic, workflows make your AI applications more maintainable, testable, and reliable.

The combination of strong TypeScript typing, flexible control flow, and seamless integration with LLMs makes Mastra an excellent choice for building sophisticated AI applications. Whether you’re building customer support automation, content moderation systems, or complex document processing pipelines, workflows give you the structure and capabilities you need.

Start simple with basic linear workflows, then gradually incorporate more advanced patterns like conditional branching, parallel execution, and human-in-the-loop processes as your applications mature. With Mastra’s local development environment and comprehensive testing tools, you can iterate quickly and confidently.

Remember that workflows are just one part of the Mastra ecosystem. Combined with agents, tools, RAG, and evals, they form a complete platform for building production-ready AI applications.

Happy building with Mastra workflows!

Resources