Developer's guide - Testing
The Testing section of the Temporal Application development guide describes the frameworks that facilitate Workflow and integration testing.
In the context of Temporal, you can create these types of automated tests:
- End-to-end: Running a Temporal Server and Worker with all its Workflows and Activities; starting and interacting with Workflows from a Client.
- Integration: Anything between end-to-end and unit testing.
- Running Activities with mocked Context and other SDK imports (and usually network requests).
- Running Workers with mock Activities, and using a Client to start Workflows.
- Running Workflows with mocked SDK imports.
- Unit: Running a piece of Workflow or Activity code (a function or method) and mocking any code it calls.
We generally recommend writing the majority of your tests as integration tests.
Because the test server supports skipping time, use the test server for both end-to-end and integration tests with Workers.
Test frameworks
Some SDKs have support or examples for popular test frameworks, runners, or libraries.
- Go
- Java
- PHP
- Python
- TypeScript
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
One recommended framework for testing in Python for the Temporal SDK is pytest, which can help with fixtures to stand up and tear down test environments, provide useful test discovery, and make it easy to write parameterized tests.
TypeScript has sample tests for Jest and Mocha.
Jest
- Minimum Jest version:
27.0.0
- Sample test file
jest.config.js
(must usetestEnvironment: 'node'
;testEnvironment: 'jsdom'
is not supported)
Mocha
- Sample test file
- Test coverage library:
@temporalio/nyc-test-coverage
Test Activities
An Activity can be tested with a mock Activity environment, which provides a way to mock the Activity context, listen to Heartbeats, and cancel the Activity. This behavior allows you to test the Activity in isolation by calling it directly, without needing to create a Worker to run the Activity.
Run an Activity
If an Activity references its context, you need to mock that context when testing in isolation.
- Go
- Java
- PHP
- Python
- TypeScript
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
To run an Activity in a test, use the ActivityEnvironment
class.
This class allows you to run any callable inside an Activity context. Use it to test the behavior of your code under various conditions.
First, create a MockActivityEnvironment
.
The constructor accepts an optional partial Activity Info
object in case any info fields are needed for the test.
Then use MockActivityEnvironment.run()
to run a function in an Activity Context.
- TypeScript
- JavaScript
import { Context } from '@temporalio/activity';
import { MockActivityEnvironment } from '@temporalio/testing';
import assert from 'assert';
// A function that takes two numbers and returns a promise that resolves to the sum of the two numbers
// and the current attempt.
async function activityFoo(a: number, b: number): Promise<number> {
return a + b + Context.current().info.attempt;
}
// Create a MockActivityEnvironment with attempt set to 2. Run the activityFoo
// function with parameters 5 and 35. Assert that the result is 42.
const env = new MockActivityEnvironment({ attempt: 2 });
const result = await env.run(activityFoo, 5, 35);
assert.equal(result, 42);
import { Context } from '@temporalio/activity';
import { MockActivityEnvironment } from '@temporalio/testing';
import assert from 'assert';
// A function that takes two numbers and returns a promise that resolves to the sum of the two numbers
// and the current attempt.
async function activityFoo(a, b) {
return a + b + Context.current().info.attempt;
}
// Create a MockActivityEnvironment with attempt set to 2. Run the activityFoo
// function with parameters 5 and 35. Assert that the result is 42.
const env = new MockActivityEnvironment({ attempt: 2 });
const result = await env.run(activityFoo, 5, 35);
assert.equal(result, 42);
Listen to Heartbeats
When an Activity sends a Heartbeat, be sure that you can see the Heartbeats in your test code so that you can verify them.
- Go
- Java
- PHP
- Python
- TypeScript
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
To test a Heartbeat in an Activity, use the on_heartbeat()
property of the ActivityEnvironment
class.
This property sets a custom function that is called every time the activity.heartbeat()
function is called within the Activity.
@activity.defn
async def activity_with_heartbeats(param: str):
activity.heartbeat(f"param: {param}")
activity.heartbeat("second heartbeat")
env = ActivityEnvironment()
heartbeats = []
# Set the `on_heartbeat` property to a callback function that will be called for each Heartbeat sent by the Activity.
env.on_heartbeat = lambda *args: heartbeats.append(args[0])
# Use the run method to start the Activity, passing in the function that contains the Heartbeats and any necessary parameters.
await env.run(activity_with_heartbeats, "test")
# Verify that the expected Heartbeats are received by the callback function.
assert heartbeats == ["param: test", "second heartbeat"]
MockActivityEnvironment
is an EventEmitter
that emits a heartbeat
event that you can use to listen for Heartbeats emitted by the Activity.
When an Activity is run by a Worker, Heartbeats are throttled to avoid overloading the server.
MockActivityEnvironment
, however, does not throttle Heartbeats.
- TypeScript
- JavaScript
import { Context } from '@temporalio/activity';
import { MockActivityEnvironment } from '@temporalio/testing';
import assert from 'assert';
async function activityFoo(): Promise<void> {
Context.current().heartbeat(6);
}
const env = new MockActivityEnvironment();
env.on('heartbeat', (d: unknown) => {
assert(d === 6);
});
await env.run(activityFoo);
import { Context } from '@temporalio/activity';
import { MockActivityEnvironment } from '@temporalio/testing';
import assert from 'assert';
async function activityFoo() {
Context.current().heartbeat(6);
}
const env = new MockActivityEnvironment();
env.on('heartbeat', (d) => {
assert(d === 6);
});
await env.run(activityFoo);
Cancel an Activity
If an Activity is supposed to react to a Cancellation, you can test whether it reacts correctly by canceling it.
- Go
- Java
- PHP
- Python
- TypeScript
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
MockActivityEnvironment
exposes a .cancel()
method that cancels the Activity Context.
- TypeScript
- JavaScript
import { CancelledFailure, Context } from '@temporalio/activity';
import { MockActivityEnvironment } from '@temporalio/testing';
import assert from 'assert';
async function activityFoo(): Promise<void> {
Context.current().heartbeat(6);
// .sleep() is Cancellation-aware, which means that on Cancellation,
// CancelledFailure will be thrown from it.
await Context.current().sleep(100);
}
const env = new MockActivityEnvironment();
env.on('heartbeat', (d: unknown) => {
assert(d === 6);
});
await assert.rejects(env.run(activityFoo), (err) => {
assert.ok(err instanceof CancelledFailure);
});
import { CancelledFailure, Context } from '@temporalio/activity';
import { MockActivityEnvironment } from '@temporalio/testing';
import assert from 'assert';
async function activityFoo() {
Context.current().heartbeat(6);
// .sleep() is Cancellation-aware, which means that on Cancellation,
// CancelledFailure will be thrown from it.
await Context.current().sleep(100);
}
const env = new MockActivityEnvironment();
env.on('heartbeat', (d) => {
assert(d === 6);
});
await assert.rejects(env.run(activityFoo), (err) => {
assert.ok(err instanceof CancelledFailure);
});
Test Workflows
Mock Activities
Mock the Activity invocation when unit testing your Workflows.
When integration testing Workflows with a Worker, you can mock Activities by providing mock Activity implementations to the Worker.
- Go
- Java
- PHP
- Python
- TypeScript
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
RoadRunner config
To mock an Activity in PHP, use RoadRunner Key-Value storage and add the following lines to your tests/.rr.test.yaml
file.
# tests/.rr.test.yaml
kv:
test:
driver: memory
config:
interval: 10
If you want to be able to mock Activities, use WorkerFactory
from the Temporal\Testing
Namespace
in your PHP Worker:
// worker.test.php
use Temporal\Testing\WorkerFactory;
$factory = WorkerFactory::create();
$worker = $factory->newWorker();
$worker->registerWorkflowTypes(MyWorkflow::class);
$worker->registerActivity(MyActivity::class);
$factory->run();
Then, in your tests to mock an Activity, use theActivityMocker
class.
Assume we have the following Activity:
#[ActivityInterface(prefix: "SimpleActivity.")]
interface SimpleActivityInterface
{
#[ActivityMethod('doSomething')]
public function doSomething(string $input): string;
To mock it in the test, you can do this:
final class SimpleWorkflowTestCase extends TestCase
{
private WorkflowClient $workflowClient;
private ActivityMocker $activityMocks;
protected function setUp(): void
{
$this->workflowClient = new WorkflowClient(ServiceClient::create('localhost:7233'));
$this->activityMocks = new ActivityMocker();
parent::setUp();
}
protected function tearDown(): void
{
$this->activityMocks->clear();
parent::tearDown();
}
public function testWorkflowReturnsUpperCasedInput(): void
{
$this->activityMocks->expectCompletion('SimpleActivity.doSomething', 'world');
$workflow = $this->workflowClient->newWorkflowStub(SimpleWorkflow::class);
$run = $this->workflowClient->start($workflow, 'hello');
$this->assertSame('world', $run->getResult('string'));
}
}
In the preceding test case, we do the following:
- Instantiate
ActivityMocker
in thesetUp()
method of the test. - Clear the cache after each test in
tearDown()
. - Mock an Activity call to return a string
world
.
To mock a failure, use the expectFailure()
method:
$this->activityMocks->expectFailure('SimpleActivity.echo', new \LogicException('something went wrong'));
Provide mock Activity implementations to the Worker.
import uuid
from temporalio.client import Client
from temporalio.worker import Worker
# Import your Activity Definition and real implementation
from hello.hello_activity import (
ComposeGreetingInput,
GreetingWorkflow,
compose_greeting,
)
# Define your mocked Activity implementation
@activity.defn(name="compose_greeting")
async def compose_greeting_mocked(input: ComposeGreetingInput) -> str:
return f"{input.greeting}, {input.name} from mocked activity!"
async def test_mock_activity(client: Client):
task_queue_name = str(uuid.uuid4())
# Provide the mocked Activity implementation to the Worker
async with Worker(
client,
task_queue=task_queue_name,
workflows=[GreetingWorkflow],
activities=[compose_greeting_mocked],
):
# Execute your Workflow as usual
assert "Hello, World from mocked activity!" == await client.execute_workflow(
GreetingWorkflow.run,
"World",
id=str(uuid.uuid4()),
task_queue=task_queue_name,
)
The mocked Activity implementation should have the same signature as the real implementation (including the input and output types) and the same name. When the Workflow invokes the Activity, it invokes the mocked implementation instead of the real one, allowing you to test your Workflow isolated.
Implement only the relevant Activities for the Workflow being tested.
- TypeScript
- JavaScript
import type * as activities from './activities';
// Creating a mock object of the activities.
const mockActivities: Partial<typeof activities> = {
makeHTTPRequest: async () => '99',
};
// Creating a worker with the mocked activities.
const worker = await Worker.create({
activities: mockActivities,
// ...
});
// Creating a mock object of the activities.
const mockActivities = {
makeHTTPRequest: async () => '99',
};
// Creating a worker with the mocked activities.
const worker = await Worker.create({
activities: mockActivities,
// ...
});
export {};
Skip time
Some long-running Workflows can persist for months or even years. Implementing the test framework allows your Workflow code to skip time and complete your tests in seconds rather than the Workflow's specified amount.
For example, if you have a Workflow sleep for a day, or have an Activity failure with a long retry interval, you don't need to wait the entire length of the sleep period to test whether the sleep function works. Instead, test the logic that happens after the sleep by skipping forward in time and complete your tests in a timely manner.
Skipping time is not relevant to unit testing Workflow code, because in that case you’re mocking functions that take time, like sleep and Activity calls.
The test framework included in most SDKs is an in-memory implementation of Temporal Server that supports skipping time.
Time is a global property of an instance of TestWorkflowEnvironment
: skipping time (either automatically or manually) applies to all currently running tests.
If you need different time behaviors for different tests, run your tests in a series or with separate instances of the test server.
For example, you could run all tests with automatic time skipping in parallel, and then all tests with manual time skipping in series, and then all tests without time skipping in parallel.
Setting up
Learn to set up the time-skipping test framework in the SDK of your choice.
- Go
- Java
- PHP
- Python
- TypeScript
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
- In the
tests
folder, createbootstrap.php
with the following contents:
declare(strict_types=1);
require __DIR__ . '/../vendor/autoload.php';
use Temporal\Testing\Environment;
$environment = Environment::create();
$environment->start();
register_shutdown_function(fn () => $environment->stop());
If you don't want to run the test server with all of your tests, you can add a condition to start a test only if the RUN_TEMPORAL_TEST_SERVER
environment variable is present:
if (getenv('RUN_TEMPORAL_TEST_SERVER') !== false) {
$environment = Environment::create();
$environment->start('./rr serve -c .rr.silent.yaml -w tests');
register_shutdown_function(fn() => $environment->stop());
}
- Add
bootstrap.php
and theTEMPORAL_ADDRESS
environment variable tophpunit.xml
:
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.3/phpunit.xsd"
bootstrap="tests/bootstrap.php"
>
<php>
<env name="TEMPORAL_ADDRESS" value="127.0.0.1:7233" />
</php>
</phpunit>
- Add the test server executable to
.gitignore
:
temporal-test-server
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
npm install @temporalio/testing
The @temporalio/testing
package downloads the test server and exports TestWorkflowEnvironment
, which you use to connect the Client and Worker to the test server and interact with the test server.
TestWorkflowEnvironment.createTimeSkipping
starts the test server.
A typical test suite should set up a single instance of the test environment to be reused in all tests (for example, in a Jest beforeAll
hook or a Mocha before()
hook).
import { TestWorkflowEnvironment } from '@temporalio/testing';
let testEnv: TestWorkflowEnvironment;
// beforeAll and afterAll are injected by Jest
beforeAll(async () => {
testEnv = await TestWorkflowEnvironment.createTimeSkipping();
});
afterAll(async () => {
await testEnv?.teardown();
});
TestWorkflowEnvironment
has a client.workflow
and nativeConnection
for creating Workers:
import { Worker } from '@temporalio/worker';
import { v4 as uuid4 } from 'uuid';
import { workflowFoo } from './workflows';
test('workflowFoo', async () => {
const worker = await Worker.create({
connection: testEnv.nativeConnection,
taskQueue: 'test',
...
});
const result = await worker.runUntil(
testEnv.client.workflow.execute(workflowFoo, {
workflowId: uuid4(),
taskQueue: 'test',
})
);
expect(result).toEqual('foo');
});
This test uses the test connection to create a Worker, runs the Worker until the Workflow is complete, and then makes an assertion about the Workflow’s result.
The Workflow is executed using testEnv.workflowClient
, which is connected to the test server.
Automatic method
You can skip time automatically in the SDK of your choice. Start a test server process that skips time as needed. For example, in the time-skipping mode, Timers, which include sleeps and conditional timeouts, are fast-forwarded except when Activities are running.
- Go
- Java
- PHP
- Python
- TypeScript
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Use the start_time_skipping()
method to start a test server process and skip time automatically.
Use the start_local()
method for a full local Temporal Server.
Use the from_client()
method for an existing Temporal Server.
The test server starts in "normal" time.
When you use TestWorkflowEnvironment.workflowClient.execute()
or .result()
, the test server switches to "skipped" time mode until the Workflow completes.
In "skipped" mode, timers (sleep()
calls and condition()
timeouts) are fast-forwarded except when Activities are running.
workflows.ts
- TypeScript
- JavaScript
import { sleep } from '@temporalio/workflow';
export async function sleeperWorkflow() {
await sleep('1 day');
}
import { sleep } from '@temporalio/workflow';
export async function sleeperWorkflow() {
await sleep('1 day');
}
test.ts
- TypeScript
- JavaScript
import { sleeperWorkflow } from './workflows';
test('sleep completes almost immediately', async () => {
const worker = await Worker.create({
connection: testEnv.nativeConnection,
taskQueue: 'test',
workflowsPath: require.resolve('./workflows'),
});
// Does not wait an entire day
await worker.runUntil(
testEnv.workflowClient.execute(sleeperWorkflow, {
workflowId: uuid(),
taskQueue: 'test',
}),
);
});
import { sleeperWorkflow } from './workflows';
test('sleep completes almost immediately', async () => {
const worker = await Worker.create({
connection: testEnv.nativeConnection,
taskQueue: 'test',
workflowsPath: require.resolve('./workflows'),
});
// Does not wait an entire day
await worker.runUntil(testEnv.workflowClient.execute(sleeperWorkflow, {
workflowId: uuid(),
taskQueue: 'test',
}));
});
Manual method
Learn to skip time manually in the SDK of your choice.
- Go
- Java
- PHP
- Python
- TypeScript
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
To implement time skipping, use the start_time_skipping()
static method.
from temporalio.testing import WorkflowEnvironment
async def test_manual_time_skipping():
async with await WorkflowEnvironment.start_time_skipping() as env:
# Your code here
# You can use the env.sleep(seconds) method to manually advance time
await env.sleep(3) # This will advance time by 3 seconds
# Your code here
You can call testEnv.sleep()
from your test code to advance the test server's time.
This is useful for testing intermediate states or indefinitely long-running Workflows.
However, to use testEnv.sleep()
, you need to avoid automatic time skipping by starting the Workflow with .start()
instead of .execute()
(and not calling .result()
).
workflow.ts
- TypeScript
- JavaScript
import { sleep } from '@temporalio/workflow';
import { defineQuery, setHandler } from '@temporalio/workflow';
export const daysQuery = defineQuery('days');
export async function sleeperWorkflow() {
let numDays = 0;
setHandler(daysQuery, () => numDays);
for (let i = 0; i < 100; i++) {
await sleep('1 day');
numDays++;
}
}
import { sleep } from '@temporalio/workflow';
import { defineQuery, setHandler } from '@temporalio/workflow';
export const daysQuery = defineQuery('days');
export async function sleeperWorkflow() {
let numDays = 0;
setHandler(daysQuery, () => numDays);
for (let i = 0; i < 100; i++) {
await sleep('1 day');
numDays++;
}
}
test.ts
- TypeScript
- JavaScript
test('sleeperWorkflow counts days correctly', async () => {
// `start()` starts the test server in "normal" mode, not skipped time mode.
// If you don't advance time using `testEnv.sleep()`, then `sleeperWorkflow()`
// will run for days.
handle = await testEnv.workflowClient.start(sleeperWorkflow, {
workflowId: uuid4(),
taskQueue,
});
let numDays = await handle.query(daysQuery);
assert.equal(numDays, 0);
// Advance the test server's time by 25 hours
await testEnv.sleep('25 hours');
numDays = await handle.query(daysQuery);
assert.equal(numDays, 1);
await testEnv.sleep('25 hours');
numDays = await handle.query(daysQuery);
assert.equal(numDays, 2);
});
test('sleeperWorkflow counts days correctly', async () => {
// `start()` starts the test server in "normal" mode, not skipped time mode.
// If you don't advance time using `testEnv.sleep()`, then `sleeperWorkflow()`
// will run for days.
handle = await testEnv.workflowClient.start(sleeperWorkflow, {
workflowId: uuid4(),
taskQueue,
});
let numDays = await handle.query(daysQuery);
assert.equal(numDays, 0);
// Advance the test server's time by 25 hours
await testEnv.sleep('25 hours');
numDays = await handle.query(daysQuery);
assert.equal(numDays, 1);
await testEnv.sleep('25 hours');
numDays = await handle.query(daysQuery);
assert.equal(numDays, 2);
});
Skip time in Activities
Learn to skip time in Activities in the SDK of your choice.
- Go
- Java
- PHP
- Python
- TypeScript
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Call TestWorkflowEnvironment.sleep
from the mock Activity.
In the following test, processOrderWorkflow
sends a notification to the user after one day.
The processOrder
mocked Activity calls testEnv.sleep(‘2 days’)
, during which the Workflow sends email (by calling the sendNotificationEmail
Activity).
Then, after the Workflow completes, we assert that sendNotificationEmail
was called.
Workflow implementation
timer-examples/src/workflows.ts
- TypeScript
- JavaScript
export async function processOrderWorkflow({
orderProcessingMS,
sendDelayedEmailTimeoutMS,
}: ProcessOrderOptions): Promise<string> {
let processing = true;
// Dynamically define the timeout based on given input
const { processOrder } = proxyActivities<ReturnType<typeof createActivities>>(
{
startToCloseTimeout: orderProcessingMS,
},
);
const processOrderPromise = processOrder().then(() => {
processing = false;
});
await Promise.race([processOrderPromise, sleep(sendDelayedEmailTimeoutMS)]);
if (processing) {
await sendNotificationEmail();
await processOrderPromise;
}
return 'Order completed!';
}
export async function processOrderWorkflow({ orderProcessingMS, sendDelayedEmailTimeoutMS, }) {
let processing = true;
// Dynamically define the timeout based on given input
const { processOrder } = proxyActivities({
startToCloseTimeout: orderProcessingMS,
});
const processOrderPromise = processOrder().then(() => {
processing = false;
});
await Promise.race([processOrderPromise, sleep(sendDelayedEmailTimeoutMS)]);
if (processing) {
await sendNotificationEmail();
await processOrderPromise;
}
return 'Order completed!';
}
timer-examples/src/test/workflows.test.ts
- TypeScript
- JavaScript
it('sends reminder email if processOrder does not complete in time', async () => {
// This test doesn't actually take days to complete: the TestWorkflowEnvironment starts the
// Test Server, which automatically skips time when there are no running Activities.
let emailSent = false;
const mockActivities: ReturnType<typeof createActivities> = {
async processOrder() {
// Test server switches to "normal" time while an Activity is executing.
// Call `env.sleep` to skip ahead 2 days, by which time sendNotificationEmail
// should have been called.
await env.sleep('2 days');
},
async sendNotificationEmail() {
emailSent = true;
},
};
const worker = await Worker.create({
connection: env.nativeConnection,
taskQueue: 'test',
workflowsPath: require.resolve('../workflows'),
activities: mockActivities,
});
await worker.runUntil(
env.client.workflow.execute(processOrderWorkflow, {
workflowId: uuid(),
taskQueue: 'test',
args: [{
orderProcessingMS: ms('3 days'),
sendDelayedEmailTimeoutMS: ms('1 day'),
}],
}),
);
assert.ok(emailSent);
});
it('sends reminder email if processOrder does not complete in time', async () => {
// This test doesn't actually take days to complete: the TestWorkflowEnvironment starts the
// Test Server, which automatically skips time when there are no running Activities.
let emailSent = false;
const mockActivities = {
async processOrder() {
// Test server switches to "normal" time while an Activity is executing.
// Call `env.sleep` to skip ahead 2 days, by which time sendNotificationEmail
// should have been called.
await env.sleep('2 days');
},
async sendNotificationEmail() {
emailSent = true;
},
};
const worker = await Worker.create({
connection: env.nativeConnection,
taskQueue: 'test',
workflowsPath: require.resolve('../workflows'),
activities: mockActivities,
});
await worker.runUntil(env.client.workflow.execute(processOrderWorkflow, {
workflowId: uuid(),
taskQueue: 'test',
args: [{
orderProcessingMS: ms('3 days'),
sendDelayedEmailTimeoutMS: ms('1 day'),
}],
}));
assert.ok(emailSent);
});
Workflow context
For a function or method to run in the Workflow context (where it’s possible to get the current Workflow info, or running inside the sandbox in the case of TypeScript or Python), it needs to be run by the Worker as if it were a Workflow.
This section is applicable in Python and TypeScript. In Python, we allow testing of Workflows only and not generic Workflow-related code.
- Go
- Java
- PHP
- Python
- TypeScript
Not applicable to this SDK.
Not applicable to this SDK.
Not applicable to this SDK.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
To test a function in your Workflow code that isn’t a Workflow, put the file it’s exported from in WorkerOptions.workflowsPath. Then execute the function as if it were a Workflow:
workflows/file-with-workflow-function-to-test.ts
- TypeScript
- JavaScript
import { sleep } from '@temporalio/workflow';
export async function functionToTest(): Promise<number> {
await sleep('1 day');
return 42;
}
import { sleep } from '@temporalio/workflow';
export async function functionToTest() {
await sleep('1 day');
return 42;
}
test.ts
- TypeScript
- JavaScript
const worker = await Worker.create({
connection: testEnv.nativeConnection,
workflowsPath: require.resolve(
'./workflows/file-with-workflow-function-to-test',
),
});
const result = await worker.runUntil(
testEnv.workflowClient.execute(functionToTest, workflowOptions),
);
assert.equal(result, 42);
const worker = await Worker.create({
connection: testEnv.nativeConnection,
workflowsPath: require.resolve('./workflows/file-with-workflow-function-to-test'),
});
const result = await worker.runUntil(testEnv.workflowClient.execute(functionToTest, workflowOptions));
assert.equal(result, 42);
If functionToTest
starts a Child Workflow, that Workflow must be exported from the same file (so that the Worker knows about it):
- TypeScript
- JavaScript
import { sleep } from '@temporalio/workflow';
import { someWorkflowToRunAsChild } from './some-workflow';
export { someWorkflowToRunAsChild };
export async function functionToTest(): Promise<number> {
const result = await wf.executeChild(someWorkflowToRunAsChild);
return result + 42;
}
import { someWorkflowToRunAsChild } from './some-workflow';
export { someWorkflowToRunAsChild };
export async function functionToTest() {
const result = await wf.executeChild(someWorkflowToRunAsChild);
return result + 42;
}
Assert in Workflow
The assert
statement is a convenient way to insert debugging assertions into the Workflow context.
The assert
method is available in Python and TypeScript.
- Go
- Java
- PHP
- Python
- TypeScript
Not applicable to this SDK.
Not applicable to this SDK.
Not applicable to this SDK.
For information about assert statements in Python, see assert
in the Python Language Reference.
The Node.js assert
module is included in Workflow bundles.
By default, a failed assert
statement throws AssertionError
, which causes a Workflow TaskWhat is a Workflow Task?
A Workflow Task is a Task that contains the context needed to make progress with a Workflow Execution.
Learn more to fail and be indefinitely retried.
To prevent this behavior, use workflowInterceptorModules
from @temporalio/testing
.
These interceptors catch an AssertionError
and turn it into an ApplicationFailure
that fails the entire Workflow Execution (not just the Workflow Task).
workflows/file-with-workflow-function-to-test.ts
- TypeScript
- JavaScript
import assert from 'assert';
export async function functionToTest() {
assert.ok(false);
}
import assert from 'assert';
export async function functionToTest() {
assert.ok(false);
}
test.ts
- TypeScript
- JavaScript
import {
TestWorkflowEnvironment,
workflowInterceptorModules,
} from '@temporalio/testing';
const worker = await Worker.create({
connection: testEnv.nativeConnection,
interceptors: {
workflowModules: workflowInterceptorModules,
},
workflowsPath: require.resolve(
'./workflows/file-with-workflow-function-to-test',
),
});
await worker.runUntil(
testEnv.workflowClient.execute(functionToTest, workflowOptions), // throws WorkflowFailedError
);
import { workflowInterceptorModules, } from '@temporalio/testing';
const worker = await Worker.create({
connection: testEnv.nativeConnection,
interceptors: {
workflowModules: workflowInterceptorModules,
},
workflowsPath: require.resolve('./workflows/file-with-workflow-function-to-test'),
});
await worker.runUntil(testEnv.workflowClient.execute(functionToTest, workflowOptions));
Replay
Replay recreates the exact state of a Workflow Execution. You can replay a Workflow from the beginning of its Event History.
Replay succeeds only if the Workflow DefinitionWhat is a Workflow Definition?
A Workflow Definition is the code that defines the constraints of a Workflow Execution.
Learn more is compatible with the provided history from a deterministic point of view.
When you test changes to your Workflow Definitions, we recommend doing the following as part of your CI checks:
- Determine which Workflow Types or Task Queues (or both) will be targeted by the Worker code under test.
- Download the Event Histories of a representative set of recent open and closed Workflows from each Task Queue, either programmatically using the SDK client or via
tctl
. - Run the Event Histories through replay.
- Fail CI if any error is encountered during replay.
The following are examples of fetching and replaying Event Histories:
- Go
- Java
- PHP
- Python
- TypeScript
Use the worker.WorkflowReplayer to replay an existing Workflow Execution from its Event History to replicate errors.
For example, the following code retrieves the Event History of a Workflow:
import (
"context"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/history/v1"
"go.temporal.io/sdk/client"
)
func GetWorkflowHistory(ctx context.Context, client client.Client, id, runID string) (*history.History, error) {
var hist history.History
iter := client.GetWorkflowHistory(ctx, id, runID, false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
for iter.HasNext() {
event, err := iter.Next()
if err != nil {
return nil, err
}
hist.Events = append(hist.Events, event)
}
return &hist, nil
}
This history can then be used to replay.
For example, the following code creates a WorkflowReplayer
and register the YourWorkflow
Workflow function.
Then it calls the ReplayWorkflowHistory
to replay the Event History and return an error code.
import (
"context"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func ReplayWorkflow(ctx context.Context, client client.Client, id, runID string) error {
hist, err := GetWorkflowHistory(ctx, client, id, runID)
if err != nil {
return err
}
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(YourWorkflow)
return replayer.ReplayWorkflowHistory(nil, hist)
}
The code above will cause the Worker to re-execute the Workflow's Workflow Function using the original Event History. If a noticeably different code path was followed or some code caused a deadlock, it will be returned in the error code. Replaying a Workflow Execution locally is a good way to see exactly what code path was taken for given input and events.
You can replay many Event Histories by registering all the needed Workflow implementation and then calling ReplayWorkflowHistory
repeatedly.
To replay Workflow Executions, use the WorkflowReplayer class in the temporal-testing
package.
In the following example, Event Histories are downloaded from the server, and then replayed. Note that this requires Advanced Visibility to be enabled.
// Note we assume you already have a WorkflowServiceStubs (`service`) and WorkflowClient (`client`)
// in scope.
ListWorkflowExecutionsRequest listWorkflowExecutionRequest =
ListWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setQuery("TaskQueue = 'mytaskqueue'")
.build();
ListWorkflowExecutionsResponse listWorkflowExecutionsResponse =
service.blockingStub().listWorkflowExecutions(listWorkflowExecutionRequest);
List<WorkflowExecutionHistory> histories =
listWorkflowExecutionsResponse.getExecutionsList().stream()
.map(
(info) -> {
GetWorkflowExecutionHistoryResponse weh =
service.blockingStub().getWorkflowExecutionHistory(
GetWorkflowExecutionHistoryRequest.newBuilder()
.setNamespace(testEnvironment.getNamespace())
.setExecution(info.getExecution())
.build());
return new WorkflowExecutionHistory(
weh.getHistory(), info.getExecution().getWorkflowId());
})
.collect(Collectors.toList());
WorkflowReplayer.replayWorkflowExecutions(
histories, true, WorkflowA.class, WorkflowB.class, WorkflowC.class);
In the next example, a single history is loaded from a JSON file on disk:
File file = new File("my_history.json");
WorkflowReplayer.replayWorkflowExecution(file, MyWorkflow.class);
In both examples, if Event History is non-deterministic, an error is thrown.
You can choose to wait until all histories have been replayed with replayWorkflowExecutions
by setting the failFast
argument to false
.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
To replay Workflow Executions, use the replay_workflows
or replay_workflow
methods, passing one or more Event Histories as arguments.
In the following example (which, as of server v1.18, requires Advanced Visibility to be enabled), Event Histories are downloaded from the server and then replayed. If any replay fails, the code raises an exception.
workflows = client.list_workflows(f"TaskQueue=foo and StartTime > '2022-01-01T12:00:00'")
histories = workflows.map_histories()
replayer = Replayer(
workflows=[MyWorkflowA, MyWorkflowB, MyWorkflowC]
)
await replayer.replay_workflows(histories)
In the next example, a single history is loaded from a JSON string:
replayer = Replayer(workflows=[YourWorkflow])
await replayer.replay_workflow(WorkflowHistory.from_json(history_json_str))
In both examples, if Event History is non-deterministic, an error is thrown.
You can choose to wait until all histories have been replayed with replay_workflows
by setting the fail_fast
option to false
.
If the Workflow History is exported by Temporal Web UI or through tctl, you can pass the JSON file history object as a JSON string or as a Python dictionary through the json.load()
function, which takes a file object and returns the JSON object.
To replay one or more Event Histories, use worker.runReplayHistories or worker.runReplayHistory.
In all examples if Workflow History is non-deterministic, a
DeterminismViolationError
will be thrown.
In the following example (which, as of server 1.18, requires advanced visibility to be enabled), histories are downloaded from the server and then replayed by passing in a client and a set of executions. The code will throw an exception if any replay fails.
- TypeScript
- JavaScript
const executions = client.workflow.list({
query: 'TaskQueue=foo and StartTime > "2022-01-01T12:00:00"',
});
const histories = executions.intoHistories();
await Worker.runReplayHistories(
{
workflowsPath: require.resolve('./your/workflows'),
},
histories,
);
const executions = client.workflow.list({
query: 'TaskQueue=foo and StartTime > "2022-01-01T12:00:00"',
});
const histories = executions.intoHistories();
await Worker.runReplayHistories({
workflowsPath: require.resolve('./your/workflows'),
}, histories);
In the next example, a single history is loaded from a JSON file on disk:
- TypeScript
- JavaScript
const filePath = './history_file.json';
const hist = await JSON.parse(fs.promises.readFile(filePath, 'utf8'));
await Worker.runReplayHistory(
{
workflowsPath: require.resolve('./your/workflows'),
},
hist,
);
const filePath = './history_file.json';
const hist = await JSON.parse(fs.promises.readFile(filePath, 'utf8'));
await Worker.runReplayHistory({
workflowsPath: require.resolve('./your/workflows'),
}, hist);
Here, we show downloading a history and replaying it separately:
replay-history/src/replayer.ts
- TypeScript
- JavaScript
const conn = await Connection.connect(
/* { address: 'temporal.prod.company.com' } */
);
const { history } = await conn.workflowService.getWorkflowExecutionHistory({
namespace: 'default',
execution: {
workflowId: 'calc',
},
});
const conn = await Connection.connect(
/* { address: 'temporal.prod.company.com' } */
);
const { history } = await conn.workflowService.getWorkflowExecutionHistory({
namespace: 'default',
execution: {
workflowId: 'calc',
},
});
Then call Worker.runReplayHistory
.
replay-history/src/replayer.ts
- TypeScript
- JavaScript
await Worker.runReplayHistory(
{
workflowsPath: require.resolve('./workflows'),
replayName: 'calc',
},
history,
);
await Worker.runReplayHistory({
workflowsPath: require.resolve('./workflows'),
replayName: 'calc',
}, history);