On Multiple Node Reconciliation

Last modified 22 Apr 2021 16:10 +02:00

The performance of reconciliation process is to be improved. How we can utilize multiple nodes to do that?

Reconciliation consists of three steps:

Step Name Description

1

scan for unfinished operations

Scans shadows for unfinished operations and tries to finish them. Normally it should be quite fast: no need to optimize this part (yet).

2

resource reconciliation

Iterates through resource objects. Invokes synchronization for each one (using SynchronizeAccountResultHandler). This part can be currently executed in multiple threads and should be made distributable onto multiple nodes.

3

shadow reconciliation

Iterates through shadows (currently all of them; might be restricted using fullSynchronizationTimestamp property). It tries to retrieve each shadow to see if it is still present on the resource. We could consider distributing this part as well (after the previous one is done).

General questions first

Distributing resource reconciliation

There are two options here.

  1. Distributed fetching - if the resource supports partitioning of objects based on some attribute, the situation is easier: each node would take its own share of the accounts to be processed.

  2. Centralized fetching - if not, there should be a coordinator node that would fetch the objects and distribute them into worker ones.

We will implement distributed fetching first.

Distributing shadow reconciliation

Whether to distribute this part depends on where the bottleneck is:

  1. If the resource itself is slow, there’s no point in invoking "get resource object" operation from multiple nodes simultaneously.

  2. If the resource is quite fast, distribution of get operations might make sense.

This will wait for now.

Master-slave (Coordinator-worker) task orchestration

Regardless of what work distribution mechanism is applied (i.e. single search with object distribution or multiple independent searches, divided into ranges), we need a mechanism to orchestrate participating tasks.

One of the tasks will be master (coordinator) whose responsibilities are:

  1. Serve as a single point of management:

    1. there should be a function "suspend the coordinator and all his workers" (besides the other, less common, variant, "suspend the coordinator but let workers execute"),

    2. there should be a function "delete the coordinator and all his workers",

    3. coordinator should provide aggregate statistics for all his workers, including listing objects that were not successfully processed (note that we should perhaps store worker task oid into operationExecution/taskRef - but the "Errors" tab on coordinator should query repo for errors recorded under OIDs of all his workers),

  2. Start his workers, and

    1. in case of object distribution: feed objects to them,

    2. in case of independent searches: distribute processing ranges to them. (Or, at least, provide data that will allow workers to obtain ranges to be processed.)

Another point to consider - will the worker tasks be single or multi-threaded?

  1. If multi-threaded: There is one worker task per node; and it has a number of lightweight asynchronous subtasks (threads).

  2. If single-threaded: There is a couple of worker tasks per node; each runs exactly one thread.

Option #2 seems to be easier to implement.

Work distribution

Centralized vs decentralized work state recording

There are two options:

  1. Centralized work state recording: Exact bucket state (including detailed progress) is kept at a central place: in the coordinator task.

  2. Decentralized work state recording: The coordinator only knows about basic bucket state (ready, delegated, complete). Details are stored in the worker tasks.

Centralized state

Advantages:

  1. No special consistency measures are needed. Each update deals with only one task.

  2. Simplicity: the whole state is kept in one place.

Disadvantages:

  1. Update contention: When updating state (e.g. by 64 worker tasks) a single data structure is to be updated. Bucket allocation is probably not that frequent but fine-grained state update might be. The solution does not scale well to larger number of workers.

Decentralized state

Advantages:

  1. Less update contention.

Disadvantages:

  1. Requires atomic update of coordinator and worker task work state.

  2. To learn about (detailed) overall state, all workers have to be retrieved.

  3. In order to do that, each bucket can be in one of the following processing/allocation states:

    1. free (initial, waiting) - the bucket waits to be allocated to a worker

    2. allocated - the bucket has been allocated; its processing might or might not already start

    3. complete - the bucket processing is complete. Nothing more to be done there.

Suspend and resume (for simple, multi-threaded as well as multi-node tasks)

  1. The task keeps the processing state in workState container. It is basically a list of buckets with their state.

  2. When resuming we simply take this state and restart processing from it.

The state is the same as the state used to distribute the work in multi-node scenario.

Multi-node tasks

  1. Suspension of a single worker: Nothing special occurs - other workers keep processing their buckets and obtaining others. Ultimately, all buckets except for the one (ones) delegated to the suspended worker are done. Then we have to release the buckets somehow, for them to be processed as well.

  2. Suspension of the whole task tree:

    1. Either coordinator and workers are suspended; after resume they simply start working when they ended.

    2. Or coordinator is suspended and workers are closed/deleted: This is quite unfortunate, as release procedure for buckets would need to be invoked.

Single-node tasks

The task has to store its work state in itself. TODO

Work state manager methods

  1. getWorkBucket - used when the task wants to start or continue working on a bucket.

  2. updateWorkBucket - update the state of the work bucket.

  3. completeWorkBucket - the work on the bucket is complete.

Multi-node scenario

getWorkBucket

  1. Return self-allocated work bucket, if there is any.

  2. Return unallocated ready work bucket, if there is any. (And allocate it.)

  3. Create ready work bucket, if there are any to be created. (And allocate it.)

  4. Try to resolve any mis-allocated buckets and continue at point 2.

Mis-allocated buckets are such buckets that are marked as allocated but the corresponding task is either closed or does not exist anymore.

There is a difference between "no more buckets can be (definitely) found" and "there are (currently) no free buckets; some are allocated". In the former case, the caller can safely finish. In the latter one, the caller should wait to see if the buckets would not become available in the meanwhile.

updateWorkBucket

  1. The state of the bucket is updated (locally or centrally).

completeWorkBucket

  1. The bucket is simply marked as complete in the coordinator task.

  2. The bucket is removed from the workState of the worker task.

Single-node scenario

getWorkBucket

All buckets are implicitly self-allocated.

  1. Return self-allocated work bucket, if there is any.

  2. Create ready work bucket, if there are any to be created. (And allocate it.)

updateWorkBucket

  1. The state of the bucket is updated.

completeWorkBucket

  1. The bucket is simply marked as complete.

When working single-node (either single-threaded or multi-threaded with centralized fetching) the workState has to be used in a bit different way. We have no buckets there: each item is a "bucket" in itself. For space reasons we do not store all items there. The algorithm is like this - assuming multi-threaded processing:

  1. When an item is fetched and queued for processing, a bucket for it is added into workState with the state of allocated.

  2. As soon as it is processed, the state is changed to complete.

  3. To conserve space, a sequence of consecutive complete buckets can be shrunken, keeping only the last one.

Each task handler can provide its own data structure there.

On the state item
-----------------
Naive approach: what was the latest processed object?
Issues:
 1. multithreading: we fetch objects sequentially and offer them to worker
    threads; so it might be that e.g. 1-24: processed, 25: processing,
    26: processing, 27: processed, etc.
 2. multinode/bucketing: it might be much more complex ... see below



Cassandra case nr.1:
 - bucket is defined as [start, end) interval; e.g. if we have 2^64 IDs and
   24K buckets,
    - first bucket is [0, 384307168202282);
    - second bucket is [384307168202282, 768614336404564);
    - and so on

Cassandra case nr.2:
 - bucket is defined as [start, end) just like in the previous case; but
   it also contains a list of accounts to be processed - or, at least, their
   number.
So we can track progress more precisely.

LDAP case:
 - bucket is defined as

DB case:
 - bucket is

AIS case:
 - bucket is defined as [id-start, id-end) interval; e.g. [0-1000), [1000-1999), ...
 - progress is

CDO case:
 - buckets are: 000xxxxx, 001xxxxx, 002xxxxx, ... (100.000 users in 1000 buckets)
 - optionally with the number of users within it
 - progress: last processed UOC within bucket

Recompute case:
 - buckets are: [0-1000), [1000-2000), ... - this defines users by their indices
   when doing sorted list (by username)
 - progress is last user number (or last username) processed
 - disadvantage: when users are renamed, they can migrate between buckets; and
   boundaries of buckets can move

Recompute case 2:
 - buckets are defined by usernames (from-to)
 - when users are renamed, they can migrate between buckets; however, buckets
   boundaries will stay unchanged

Recompute case 3:
 - buckets are defined by OIDs (from-to)
 - renaming users has no effect on their placement within buckets nor buckets boundaries

So we need:
- AbstractTaskBucketType (and its concrete subtypes)
    - state (initial, processing, complete)
    - identification ?
    - boundary specification
    - progress specification (indicative, operative)



On storing bucket information
=============================
1. Initial and complete buckets should take minimal amount of space
2. Buckets in progress can be more verbose

E.g. if we would have 100.000 buckets each consuming 500 bytes of information, this is:
100.000 \* 200 = 20.000.000 bytes.

<bucket>
  <id>23840</id>
  <state>initial</state>
  <from>384307168202282321</from>
  <to>38430716820228299999</to>
</bucket>

<bucket>
  <id>23840</id>
  <state>allocated</state>
  <taskRef oid="..."/>
  <from>384307168202282321</from>
  <to>38430716820228299999</to>
</bucket>

alt.

<bucket>23840:initial:384307168202282321:384307168202299999</bucket>

Worker task will have currentBucket with an information about what is in progress.

----------------------

Coordinator task:
 1. create bucket information
 2. create worker tasks
 3. wait for worker tasks completion

Worker task:
 1. obtain a bucket (something in initial state)
    - if none available, exit
 2. work on it, updating it in currentBucket item
 3. when completed, update data structure in coordinator task and go to point 1

What about collisions?
Two worker tasks try to take the same bucket.
We do it like this:
1. get task
2. select free bucket (id=X)
3. modify: update bucket[X].state = allocated
   with precondition: bucket[X].state = initial

First task will succeed, second one will get precondition violation exception.

Updates should not be frequent in order to avoid contention on the master task.

Alternatively, master task can pre-allocate buckets to worker tasks; so each one could work on its share without much overhead.

Framework / things to customize:

1. bucket structure
2. initial buckets creation (java / expressions / strategy ...)
3. current bucket processing
   e.g. conversion from bucket to query


AbstractTaskWorkBucketType
 - id (container id)
 - state
 - taskRef

CassandraTaskWorkBucketType
 - from, to (long)

RepositoryOidTaskWorkBucketType
 - from, to (string)
 - records
 - processedRecords
 - lastProcessedOid

RepositoryNameTaskWorkBucketType
 - from, to (string)
 - records
 - processedRecords
 - lastProcessedName

RepositoryIndexedWorkBucketType
 - from, to (int)
 - lastProcessedItem (int)