DynamoDB and distributed lock

Subscribe Send me a message home page


Introduction

We will take a look at the DynamoDB lock client library in this post. This library is an implementation of distributed lock using DynamoDB. Distrubted lock is important in a distributed system. There are many good online resources about this topic. Here we recomment the post of Martin Kleppmann How to do distributed locking. To have a better understanding of DynamoDB, you may check the officient documentation or the original dynamo white pagper.

In the following sections we will try to answer the following questions:

The objective of this post is to document at a high level how DynamoDB lock library implements a distributed lock, therefore some technical details that are not functionally crutial are ignored in the below discussion.

One thing we would like to point out is that the dynamo papaer seems to highlight the eventual consistency aspect of the dynamo store. On the other hand, DynamoDB itself does support consistent read.

In order to perform a consistent read, which returns the latest value of an item, we could set the consistenRead flag to true in the GetItemRequest as illustrated in the code below. It is part of readFromDynamoDB method.

1
2
3
final GetItemRequest getItemRequest = GetItemRequest.builder().tableName(tableName).key(dynamoDBKey)
        .consistentRead(true)
        .build();

Consistent write is achieved via conditions in write requests. We will see in the following sections that the DynamoDB lock client library relies heavily on the conditions in write request to achieve consistency. You may find more details about condition expressions in DynamoDB here.

How does the library release a lock?

Let's start with realeasing lock. It is relatively simple because in a happy path the client should already hold the lock when trying to release it. This functionality is implemented in the releaseLock method.

The first thing to do in the releaseLock method is to check if the client owns the lock:

1
2
3
if (!lockItem.getOwnerName().equals(this.ownerName)) {
    return false;
}

Depending on the options, we could delete the item in the lock table or update the existing item. If we decide to update the existing item in the DynamoDB table, what we need to do is to update the isRelease attribute of the item and mark the item as isReleased.

Whatever we do, the condition of the execution is the same. The condition is defined here and if we ignore the details about the sortKey, then what we really have is the following expression:

1
conditionalExpression = PK_EXISTS_AND_OWNER_NAME_SAME_AND_RVN_SAME_CONDITION;

From the name of the variable and according to the comments, in order to release a lock, we will check the following conditions:

Now that we have the condition, we can execute this.dynamoDB.deleteItem(deleteItemRequest); or this.dynamoDB.updateItem(updateItemRequest);. This is pretty much it for releasing the lock.

Note:

  1. When we release the lock, we don't update the record version number.
  2. When we release the lock, we don't check if the lock has already been released.

It seems that this implementation makes the relaseLock idempotent and the client that owns the lock can release the same lock multiple times.

How does the library send heartbeats?

The next topic is heartbeat. The owner of the lock needs to refresh its lease by sending heartbeats so that it can keep the lock. Sending heartbeats is implemented in the sendHeartbeat method. It can be fonud here.

If we search the code we will find that the sendHeartbeat method is used here in the run method. For there is an infinit loop in the run method, we can assume this method is running in the background. If we remove all the exception handling, what the run method does is to alternate between sending heartbeat signals and waiting a pre-defined period.

Here is a simplified version of the run method:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
 * Loops forever, sending hearbeats for all the locks this thread needs to keep track of.
 */
@Override
public void run() {
    while (true) {

        // sometimes libraries wrap interrupted and other exceptions
        if (this.shuttingDown) {
            throw new InterruptedException();
        }

        final long timeWorkBegins = LockClientUtils.INSTANCE.millisecondTime();
        final Map workingCopyOfLocks = new HashMap<>(this.locks);

        for (final Entry lockEntry : workingCopyOfLocks.entrySet()) {
            this.sendHeartbeat(lockEntry.getValue());
        }

        final long timeElapsed =
            LockClientUtils.INSTANCE.millisecondTime() - timeWorkBegins;

        // sometimes libraries wrap interrupted and other exceptions
        if (this.shuttingDown) {
            throw new InterruptedException();
        }

        // If we want to hearbeat every 9 seconds, and it took 3 seconds to send the
        // heartbeats, we only sleep 6 seconds
        Thread.sleep(Math.max(this.heartbeatPeriodInMilliseconds - timeElapsed, 0));
    }
}

As we can see here the for loop in line 16 indicates that a client can hold multiple locks at same time.

Before we taking a look at the implementation of sendHeartbeat method, we need to answer the question when the run method is executed.

The background thread is started in the startBackgroundThread, which is executed here in the constructor.

1
2
3
4
5
6
7
8
9
/* Helper method that starts a background heartbeating thread */
private Thread startBackgroundThread() {
    final Thread t = namedThreadCreator
        .apply("dynamodb-lock-client-" + lockClientId.addAndGet(1))
        .newThread(this);
    t.setDaemon(true);
    t.start();
    return t;
}

Now we are ready to look at the implementation of sendHeartbeat method.

Ignoring all the technical details about different options, the first thing we do in the sendHeartbeat method is to check if the client actually owns the lock. The code below perfoms this validation.

1
2
3
4
5
final LockItem lockItem = options.getLockItem();
if (lockItem.isExpired() || !lockItem.getOwnerName().equals(this.ownerName) || lockItem.isReleased()) {
    this.locks.remove(lockItem.getUniqueIdentifier());
    throw new LockNotGrantedException("Cannot send heartbeat because lock is not granted");
}

Next, we synchronize the lock object. This is needed because sending heatbeats means we need to update the record in the DynamoDB as well as the lock instance hold by the client and we don't want to have multiple threads modify the same object concurrently. This also saves us some bandwith because only one thread can successfully update the item in the DynamoDB table due to the conditions specified in the request.

If we ignore all the technical implementation details, the main steps in the synchronize block in the sendHeartbeat method can be summarized in the following code:

1
2
3
4
5
6
7
8
9
10
11
12
synchronized (lockItem) {
    // prepare the update request.
    setExpressions();
    conditionalExpression = PK_EXISTS_AND_OWNER_NAME_SAME_AND_RVN_SAME_CONDITION;
    final String recordVersionNumber = this.generateRecordVersionNumber();
    final UpdateItemRequest updateItemRequest = createUpdateItemRequest(conditionalExpression, recordVersionNumber);

    // update the lock item both in DynamoDB and locally.
    final long lastUpdateOfLock = LockClientUtils.INSTANCE.millisecondTime();
    this.DynamoDB.updateItem(updateItemRequest);
    lockItem.updateRecordVersionNumber(recordVersionNumber, lastUpdateOfLock, leaseDurationToEnsureInMilliseconds);
}

One thing to notice here is that we have two updates: one is in the DynamoDB (remote update) and the other is the update of lockItem instance (local update).

Here we see the PK_EXISTS_AND_OWNER_NAME_SAME_AND_RVN_SAME_CONDITION again. This is the same condition we have when we try to release the lock. This makes sense because both releasing lock and sending heartbeats requires that the client owns the lock in the first place.

There are two major differences between releasing lock and sending heartbeats. As we mentioned earlier, when releasing the lock we do not check if the lock has already been released and we do not update the record version number. However we need these two actions when sending heartbeats. The isReleased check is obviously needed when sending heartbeats because if the lock is released then we need to acquire it again before performing any operations. The second one might be an implementation detail. When the client sends a heartbeat, conceptually it creates a new lock and acquire it immediately.

How does the library acquire a lock?

The client can only acquire a lock in the following three scenarios

  1. The client creates a new lock and acquire it.
  2. The client can acquire a released lock.
  3. The client can acqurie an expired lock.

The entry point is acquireLock method. The method has two parts. The first part is doing preparation work, including data validation. The second part is an infinite while loop. The infinity loop is needed because if the client cannot acquire the lock it will retry until it succeeds or a time limit is reached.

Skipping details about options handling, we arrive at the next important piece in the acquireLock method.

1
2
3
4
5
6
7
8
//if the existing lock does not exist or exists and is released
if (!existingLock.isPresent() && !options.getAcquireOnlyIfLockAlreadyExists()) {
    return upsertAndMonitorNewLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData,
        item, recordVersionNumber);
} else if (existingLock.isPresent() && existingLock.get().isReleased()) {
    return upsertAndMonitorReleasedLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, existingLock,
        newLockData, item, recordVersionNumber);
}

As we can see here if the existing lock is not present we will call upsertAndMonitorNewLock method and if the existing lock is released we will call upsertAndMonitorReleasedLock method. We will take a look at the implementation detials of these two methods later. For now let's keep going and see what else we have in the acquireLock method.

If we can pass the condition check in the above code, it means someone else is holding the lock. There two scenario here:

  1. The lock is hold by someone else and it is not expired.
  2. The lock is hold by someone else and it is expired.

These two scenarios are handled by the code below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (lockTryingToBeAcquired.getRecordVersionNumber()
        .equals(existingLock.get().getRecordVersionNumber())) {
    // If the version numbers match, then we can acquire the lock,
    // assuming it has already expired.
    if (lockTryingToBeAcquired.isExpired()) {
        return upsertAndMonitorExpiredLock(
             options, key, sortKey, deleteLockOnRelease,
             sessionMonitor, existingLock, newLockData, item,
             recordVersionNumber);
    }
} else {
    /*
     * If the version number changed since we last queried the lock,
     * then we need to update lockTryingToBeAcquired as the lock
     * has been refreshed since we last checked.
     */
    lockTryingToBeAcquired = existingLock.get();
}

Note that the code above is included in an else block. The code in the if block is to make sure the client wait at least the lease duration.

The acquire lock process can be summarized in the code below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
while (true) {

    final Optional existingLock = this.getLockFromDynamoDB(getLockOptions);

    try {
        if (!existingLock.isPresent()
            && !options.getAcquireOnlyIfLockAlreadyExists()) {
            return upsertAndMonitorNewLock();
        } else if (existingLock.isPresent() && existingLock.get().isReleased()) {
            return upsertAndMonitorReleasedLock();
        } else {
            if (isRecordVersionNumberEqual(lockTryingToBeAcquired,existingLock) {
                // If the version numbers match, then we can acquire the lock,
                // assuming it has already expired.
                if (lockTryingToBeAcquired.isExpired()) {
                    return upsertAndMonitorExpiredLock();
                }
            } else {
                 // If the version number changed since we last queried the lock,
                 // then we need to update lockTryingToBeAcquired as the lock
                 // has been refreshed since we last checked.
                lockTryingToBeAcquired = existingLock.get();
            }
        }
    } catch (final Exception e) {
        // In this case we failed to acqurie the lock
        if (!canWaitLonger()) {
            throw new Exceptin();
        }
    }

    Thread.sleep(refreshPeriodInMilliseconds);
}

Update DynamoDB table

When the client acquires a lock, it needs to update the DynamoDB table by sending a write request. In the previous section, we have seen three methods used in the acquireLock method. They are

The implementation of these methods has similar structure. The main difference is the condition specified in the write request as listed in the table below.

upsert method condition
upsertAndMonitorNewLock ACQUIRE_LOCK_THAT_DOESNT_EXIST_PK_CONDITION
upsertAndMonitorReleasedLock PK_EXISTS_AND_SK_EXISTS_
AND_RVN_IS_THE_SAME_AND_IS_RELEASED_CONDITION
upsertAndMonitorExpiredLock PK_EXISTS_AND_RVN_IS_THE_SAME_CONDITION

----- END -----

Send me a message Subscribe to blog updates

Want some fun stuff?

/static/shopping_demo.png


Comments