Apache Hive: Replication Policy Scheduling

Pravin Kumar Sinha
8 min readMar 26, 2021

In my post here, I had given a brief introduction about Replication Policy Scheduling feature which is getting introduced in Hive Replication V3. This post is an attempt to provide more details on this.

At the end of the post, you should be able to know:

* What is a scheduled replication policy (also referred as replication policy)
* Changes in the syntax of REPL DUMP and REPL LOAD commands
* What is replication metrics
* How to perform CRUD operations on a replication Policy
* How to enable/disable replication policy

Before we discuss about the new replication policy scheduling feature in Hive Replication V3, just a recap on how REPL DUMP and REPL LOAD commands worked in replication V2. If you are already familiar with it, you can skip this paragraph. Replication in Hive uses two phased approach: Bootstrap and Incremental. The first iteration of replication is called bootstrap phase as it performs the full copy of the source database(data/metadata) onto the target. Second and subsequent iterations of replication are called as incremental phase as they copy only the differential (data/metadata) during that phase. The incremental runs are event based. An Event captures the details of the change in data/metadata on source database under replication. During replication, the same set of captured events are replayed onto the target database to get to the same state source cluster is in. Events has a property called event id which is monotonically increasing sequence number. This helps in capturing the delta events pending to be replayed on target database during incremental phase of replication. Additionally, each replication phase has two steps dump and load. During dump operation (performed using command ‘REPL DUMP …’) data/metadata are dumped to the staging area aka dump directory, and during load operation (performed using command ‘REPL LOAD …’) the same dump directory is used to load the data/metadata onto the target database.

Scheduled Replication Policy

Earlier, users had to manage the REPL commands’ execution on their own as frequently as they wanted. Creating a scheduled replication policy allows an end user to delegate the policy orchestration task to Hive. Hive replication leverages an already available capability in Hive called Hive Scheduled Query which allows an end user to submit any Hive query with a desired schedule. Hive replication commands are submitted as Hive Scheduled Queries in Hive. You might want to read more about Hive Scheduled Queries here. To make it clear, a scheduled replication policy is nothing but a Hive Scheduled query with an underlying query being a REPL command in stead of some other Hive SQL command.

Challenges for running REPL commands as “yet another Hive Scheduled Query”

One thing you might notice that the capability of scheduling a Hive query with desired frequency already existed in Hive, so why not just run each REPL Command as just another Hive SQL command using the same Hive query scheduler. A common staging directory could have been passed to them using the config ‘hive.repl.rootdir’ .
The short answer is “NO”, we couldn’t. The long one requires justification. There were few challenges here:

a) Challenge 1: REPL DUMP and REPL LOAD commands work in producer-consumer fashion, i.e REPL DUMP produces the metadata and dumps it to staging dir, and then REPL LOAD reads the same content and apply them on target. We can submit one Hive scheduled query for REPL DUMP command on source cluster and another Hive scheduled query for REPL LOAD command on target cluster. However, REPL LOAD can not proceed until REPL DUMP operation is complete. Similarly, post REPL DUMP operation, until REPL LOAD completes, there is no point going ahead with next REPL DUMP operation as this will end-up creating another dump directory. As such REPL LOAD operation is slower than REPL DUMP operation for obvious reasons, there is no way REPL LOAD is going to catch up with REPL DUMP’s produced dump dirs, This could lead to nasty issues like holding up files in HDFS unnecessarily. Another issue here was also that how would REPL LOAD know that dumped data is complete and not half baked and How will REPL DUMP know that previously produced files are loaded fully.

How did we solve this: There was a need to establish a communication channel between REPL DUMP and REPL LOAD schedules to convey the readiness upon completion. Since both dump and load operations were sharing the same staging directory passed using Hive config ‘hive.repl.rootdir’, we built a light-weight file based acknowledgement between REPL DUMP schedules (running on source cluster) and REPL LOAD schedules(running on target cluster) to communicate with each other.

REPL DUMP operation places an acknowledgment file ‘_finished_dump’ in the dump dir at the end of REPL DUMP operation to mark the completion.

REPL LOAD operation would look for the latest dump dir with a dump acknowledgment file “_finished_dump”, and then only would proceed with that staging dir, else it will skip the load process in current iteration.

On the other hand, upon completion, REPL LOAD operation places an acknowledgment file “_finished_load”, in the dump dir to mark the completion of load operation.

Similarly the REPL DUMP operation will look into latest dump dir. If there is directory with dump acknowledgement file “_finished_dump” present but no load acknowledgement file “_finished_load”, it skips performing dump operation in current run.

b) Challenge 2: The second problem was the syntax of REPL DUMP command which was different for bootstrap and incremental phases.
bootstrap: REPL DUMP <SourceDbName>
incremental: REPL DUMP <SourceDbName> FROM <eventID>
Since there was a single query for REPL DUMP operation to be submitted to Hive Query Scheduler, how will the query would have changed to adopt new syntax when the replication transitions from bootstrap to incremental phase.

How did we solve this:
As a part of REPL DUMP operation, Hive replication already creates a dump metadata file “_dumpmetadata” in the dump directory. The file consists of tab separated info like: phase(BOOTSTRAP/INCREMENTAL), from Event ID, to Event ID, plus few other data.
* The first thing we did was to have a uniform syntax for both bootstrap and incremental dump operations. So now the syntax used for dump operation is always: REPL DUMP <sourceDBName> regardless of bootstrap or incremental dump operation. This immediately solved the issue of having different Hive scheduled query requirement during bootstrap and incremental dump for the same database replication
* To get the from event id during incremental run, user earlier used to pass it in REPL DUMP command using FROM keyword. Now Hive reads the dump metadata file “_dumpmetadata” from the last loaded dump directory. This way dump operation knows exactly until what event Id, the events have been replayed on target and hence can dump events which are generated only after that.

c) Challenge 3: Multiple replication policies can choose to have the same value for the hive config ‘hive.repl.rootdir’. So what? Earlier in replication V2, this wasn’t an issue. But now in order to implement file based acknowledgement correctly(as stated above as problem 1), it was important to separate the dump directories for replication policies.

How did we solve this: In replication V2, Hive used to generate a UUID based dump directory inside the directory root path supplied by hive config ‘hive.repl.rootdir’. So, for each dump operation the dump directory would be something like:
${hive.repl.rootdir}/UUID.randomUUID().toString()
In replication V3, in order to implement file based acknowledgement correctly, we needed to have a unique base dump dir path for each replication policy under which we can place a UUID based dump dir per dump operation. The easier choice was to tell the user to make sure the value of config ‘hive.repl.rootdir’ for each replication policy is unique. But to avoid any human error due to manually making sure that the path is unique, we decided to automatically achieve the uniqueness in the dump dir base path under which each dump operation can place a UUID based dump dir.

* In repl V3, his is how Hive computes the final dump dir, during each dump:
${hive.repl.rootdir}/base64Encode(srcDB)/UUID.randomUUID().toString()

* On target cluster during load operation also we needed to compute in the same way for which source dbName was required in load command, and so the REPL LOAD command now looks something like:
REPL LOAD <srcDB> INTO <targetDB>

d) Challenge 4: Hive Scheduled Query is very generic, given that now the REPL DUMP and REPL LOAD operations had to run as scheduled queries, how would user know what is being dumped/loaded in each execution of REPL commands, whether the dump or load skipped the execution because the other one had not produced/consumed the dump content.

How did we solve this: As a part of each replication operation executions (both dump and load), Hive now stores a parse-able JSON blob having the replication metrics. The replication metrics are stored in a separate table inside sys db: “sys.replication_metrics”. The same table can be queried to check the progress or status of the replication policies.

CRUD operations on a replication Policy

All the crud operations can be performed using beeline. Most of the queries would be similar to what other Hive Scheduled Query would look like;

a) Create a replication policy for REPL DUMP operation(on source cluster):
CREATE SCHEDULED QUERY <repl_policy_name> EVERY 30 minutes AS REPL DUMP <SrcDbName> with (comma separated configs in ‘key’=’val’ format) [EXECUTED AS <userName>];

Example:
CREATE SCHEDULED QUERY repl_policy1 EVERY DAY AT ‘11:35:30’ AS REPL DUMP db1 WITH(‘hive.repl.rootdir’=’hdfs://<NameNodeHost:port>/user/hive/repl’);

b) Create a replication policy for REPL LOAD operation(on target cluster):
CREATE SCHEDULED QUERY <repl_policy_name> EVERY 30 minutes AS REPL LOAD <SrcDbName> INTO <TargetDbName>with (comma separated configs in ‘key’=’val’ format) [EXECUTED AS <userName>]

Example:
CREATE SCHEDULED QUERY repl_policy1 EVERY DAY AT ‘11:45:30’ AS REPL LOAD db1 INTO db1 with(‘hive.repl.rootdir’= ‘hdfs://<NameNodeHost:port>/user/hive/repl’);

[Note: For both a) and b) make sure same value is passed for config “hive.repl.rootdir”, and that should be a fully qualified hdfs path. i.e, though NameNodeHost:port above could be either from source or target cluster, but has to be same value for both REPL DUMP and REPL LOAD]

c) Retrieve all the submitted replication policies:
SELECT * FROM information_schema.scheduled_queries;
[Note: Above query will give you all the scheduled queries. You can choose to name the replication one with some prefix(like repl_) during creation and use that as a differentiator during SELECT query]

Example:
SELECT * FROM information_schema.scheduled_queries WHERE schedule_name LIKE ‘repl_%’;

d) Update a replication policy:
ALTER SCHEDULED QUERY <repl_policy_name>
(<scheduleSpec>|<executedAsSpec>|<enableSpecification>|<definedAsSpec>|<executeSpec>);

e) On-demand execution:
ALTER SCHEDULED QUERY <policyName> EXECUTE;
Even though there is a frequency mentioned while submitting the policy, you can still execute the same policy in on-demand basis.

Example:
ALTER SCHEDULED QUERY repl_policy1 EXECUTE;

f) Delete a replication policy:
DROP SCHEDULED QUERY <repl_policy_name>;

g) Enable a replication policy (by default enabled)
ALTER SCHEDULED QUERY repl_policy_name enabled;

h) Disable a replication policy
ALTER SCHEDULED QUERY repl_policy_name disabled;

i) Get status of all executed policy instance:
Once you submit a replication policy, in order to check if this has run or the status of the run etc, you can use query:
SELECT * FROM information_schema.scheduled_executions WHERE schedule_name=’<repl_policy_name>’;

Example:
SELECT * FROM information_schema.scheduled_executions WHERE schedule_name=’repl_policy1';

j) In order to retrieve the metrics of replication policy, use this query:
SELECT * FROM sys.replication_metrics WHERE policy_name=’<repl_policy_name>';

Example:
SELECT * FROM sys.replication_metrics WHERE policy_name=’repl_policy1';

--

--