Async Generators in Use Pt. 2: Multiple DynamoDB Queries in Parallel
Recap
This is a follow-up to my previous post on using async generators in conjunction with DynamoDB pagination. As we saw last time, async generators can be a handy way to iterate over ”eventually available” collections:
async function* getPaginatedResults(dynamoClient, params) {
let lastEvaluatedKey;
do {
const queryResult = await dynamoClient.query(params).promise();
lastEvaluatedKey = queryResult.LastEvaluatedKey;
params.ExclusiveStartKey = lastEvaluatedKey;
yield queryResult.Items;
} while (lastEvaluatedKey);
}
for await (const pageOfItems of getPaginatedResults(dynamoClient, someQueryParams)) {
// do something with the page of items
}
This works great as is for making a single query that can potentially be paginated. What if we need to run multiple such queries in parallel? Let’s try a naïve approach first.
👎 Stacking for-await-of
Loops
The for-await-of
loop (like the regular for
loop) doesn’t allow the code below itself to execute until it’s done
iterating:
for await (const pageOfItems of getPaginatedResults(dynamoClient, someQueryParams)) {
// do something with the page of items
}
// <-- Can’t get to here until the loop is over
This can present a problem in cases like ours. If we just put two for-await-of
loops adjacent to each other, the
second query won’t start until the first one loops through all of its pages:
for await (const pageOfItems of getPaginatedResults(dynamoClient, someQueryParams)) {
// do something with the page of items
}
// Only when the first query is done can we start the next one.
// If the second query doesn’t depend on the first one’s results,
// running them sequentially is suboptimal.
for await (const pageOfItems of getPaginatedResults(dynamoClient, someOtherQueryParams)) {
// do something with the page of items
}
Looks like just putting two for-await-of
loops together isn’t achieving what we want, so we need to look for a
different approach.
👍 Wrapping a for-await-of
Loop in an Async Function
Before proceeding, let’s assume we have two parallel queries for simplicity (to be clear, since each query can get paginated, we’re potentially talking about two parallel series of queries).
In general, if we want two parallel asynchronous tasks to complete before continuing, we can use Promise.all
, passing
in the promises representing the completion of each task:
async function task1() {}
async function task2() {}
const p1 = task1();
const p2 = task2();
await Promise.all([p1, p2]);
// ...continue execution
In our case, each task is a paginated query. If we can somehow get a promise that indicates all pages of a query have
been processed, we’ll be able to use the above scheme. How do we get a promise like that? Well, there’s a hint in the
example above: if we put our old for-await-of
loop into an async function, the promise returned from that function
will essentially represent the completion of that loop. Let’s do just that:
async function paginateQuery(dynamoClient, queryParams) {
for await (const pageOfItems of getPaginatedResults(dynamoClient, queryParams)) {
// do something with the page of items
}
}
Now, for this to become a truly reusable helper, we need to parameterize it with the body of the loop. Here’s an example for querying users and posts in parallel:
async function paginateQuery(dynamoClient, queryParams, callback) {
for await (const pageOfItems of getPaginatedResults(dynamoClient, queryParams)) {
await callback(pageOfItems); // optionally break from the loop if callback returns false
}
}
const usersPromise = paginateQuery(client, usersParams, async (pageOfUsers) => { /* ... */ });
const postsPromise = paginateQuery(client, postsParams, async (pageOfPosts) => { /* ... */ });
await Promise.all([usersPromise, postsPromise]);
// ...continue execution
Wait, we’re back to dealing with callbacks now? A bit anticlimactic, but it seems that we have to in this case. Note that you only have to use this helper where you need parallel queries, although if you decide to use it everywhere, that’s totally reasonable too 😄.
Conclusion
As we’ve seen, directly using a for-await-of
loop with an async generator can be pretty convenient but has its
limitations. Do let me know if you can think of a more elegant solution!