/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.BackupImage;
import org.apache.hadoop.hdfs.server.namenode.BackupNode;
import org.apache.hadoop.hdfs.server.namenode.CheckpointConf;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;

class Checkpointer
extends Daemon {
    public static final Log LOG = LogFactory.getLog(Checkpointer.class.getName());
    private final BackupNode backupNode;
    volatile boolean shouldRun;
    private String infoBindAddress;
    private CheckpointConf checkpointConf;
    private final Configuration conf;

    private BackupImage getFSImage() {
        return (BackupImage)this.backupNode.getFSImage();
    }

    private NamenodeProtocol getRemoteNamenodeProxy() {
        return this.backupNode.namenode;
    }

    Checkpointer(Configuration conf, BackupNode bnNode) throws IOException {
        this.conf = conf;
        this.backupNode = bnNode;
        try {
            this.initialize(conf);
        }
        catch (IOException e) {
            LOG.warn("Checkpointer got exception", e);
            this.shutdown();
            throw e;
        }
    }

    private void initialize(Configuration conf) throws IOException {
        this.shouldRun = true;
        this.checkpointConf = new CheckpointConf(conf);
        String fullInfoAddr = conf.get("dfs.namenode.backup.http-address", "0.0.0.0:50105");
        this.infoBindAddress = fullInfoAddr.substring(0, fullInfoAddr.indexOf(":"));
        LOG.info("Checkpoint Period : " + this.checkpointConf.getPeriod() + " secs " + "(" + this.checkpointConf.getPeriod() / 60L + " min)");
        LOG.info("Transactions count is  : " + this.checkpointConf.getTxnCount() + ", to trigger checkpoint");
    }

    void shutdown() {
        this.shouldRun = false;
        this.backupNode.stop();
    }

    @Override
    public void run() {
        long periodMSec = 300L;
        if (this.checkpointConf.getPeriod() < periodMSec) {
            periodMSec = this.checkpointConf.getPeriod();
        }
        periodMSec *= 1000L;
        long lastCheckpointTime = 0L;
        if (!this.backupNode.shouldCheckpointAtStartup()) {
            lastCheckpointTime = Time.monotonicNow();
        }
        while (this.shouldRun) {
            block11: {
                try {
                    long now = Time.monotonicNow();
                    boolean shouldCheckpoint = false;
                    if (now >= lastCheckpointTime + periodMSec) {
                        shouldCheckpoint = true;
                    } else {
                        long txns = this.countUncheckpointedTxns();
                        if (txns >= this.checkpointConf.getTxnCount()) {
                            shouldCheckpoint = true;
                        }
                    }
                    if (!shouldCheckpoint) break block11;
                    this.doCheckpoint();
                    lastCheckpointTime = now;
                }
                catch (IOException e) {
                    LOG.error("Exception in doCheckpoint: ", e);
                }
                catch (Throwable e) {
                    LOG.error("Throwable Exception in doCheckpoint: ", e);
                    this.shutdown();
                    break;
                }
            }
            try {
                Thread.sleep(periodMSec);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    private long countUncheckpointedTxns() throws IOException {
        long curTxId = this.getRemoteNamenodeProxy().getTransactionID();
        long uncheckpointedTxns = curTxId - this.getFSImage().getStorage().getMostRecentCheckpointTxId();
        assert (uncheckpointedTxns >= 0L);
        return uncheckpointedTxns;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void doCheckpoint() throws IOException {
        BackupImage bnImage = this.getFSImage();
        NNStorage bnStorage = bnImage.getStorage();
        long startTime = Time.monotonicNow();
        bnImage.freezeNamespaceAtNextRoll();
        NamenodeCommand cmd = this.getRemoteNamenodeProxy().startCheckpoint(this.backupNode.getRegistration());
        CheckpointCommand cpCmd = null;
        switch (cmd.getAction()) {
            case 50: {
                this.shutdown();
                throw new IOException("Name-node " + this.backupNode.nnRpcAddress + " requested shutdown.");
            }
            case 51: {
                cpCmd = (CheckpointCommand)cmd;
                break;
            }
            default: {
                throw new IOException("Unsupported NamenodeCommand: " + cmd.getAction());
            }
        }
        bnImage.waitUntilNamespaceFrozen();
        CheckpointSignature sig = cpCmd.getSignature();
        sig.validateStorageInfo(bnImage);
        long lastApplied = bnImage.getLastAppliedTxId();
        LOG.debug("Doing checkpoint. Last applied: " + lastApplied);
        RemoteEditLogManifest manifest = this.getRemoteNamenodeProxy().getEditLogManifest(bnImage.getLastAppliedTxId() + 1L);
        boolean needReloadImage = false;
        if (!manifest.getLogs().isEmpty()) {
            RemoteEditLog firstRemoteLog = manifest.getLogs().get(0);
            if (firstRemoteLog.getStartTxId() > lastApplied + 1L) {
                LOG.info("Unable to roll forward using only logs. Downloading image with txid " + sig.mostRecentCheckpointTxId);
                MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(this.backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId, bnStorage, true);
                bnImage.saveDigestAndRenameCheckpointImage(NNStorage.NameNodeFile.IMAGE, sig.mostRecentCheckpointTxId, downloadedHash);
                lastApplied = sig.mostRecentCheckpointTxId;
                needReloadImage = true;
            }
            if (firstRemoteLog.getStartTxId() > lastApplied + 1L) {
                throw new IOException("No logs to roll forward from " + lastApplied);
            }
            for (RemoteEditLog log : manifest.getLogs()) {
                TransferFsImage.downloadEditsToStorage(this.backupNode.nnHttpAddress, log, bnStorage);
            }
            if (needReloadImage) {
                LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
                File file = bnStorage.findImageFile(NNStorage.NameNodeFile.IMAGE, sig.mostRecentCheckpointTxId);
                bnImage.reloadFromImageFile(file, this.backupNode.getNamesystem());
            }
            Checkpointer.rollForwardByApplyingLogs(manifest, bnImage, this.backupNode.getNamesystem());
        }
        long txid = bnImage.getLastAppliedTxId();
        this.backupNode.namesystem.writeLock();
        try {
            this.backupNode.namesystem.setImageLoaded();
            if (this.backupNode.namesystem.getBlocksTotal() > 0L) {
                this.backupNode.namesystem.setBlockTotal();
            }
            bnImage.saveFSImageInAllDirs(this.backupNode.getNamesystem(), txid);
            bnStorage.writeAll();
        }
        finally {
            this.backupNode.namesystem.writeUnlock("doCheckpoint");
        }
        if (cpCmd.needToReturnImage()) {
            TransferFsImage.uploadImageFromStorage(this.backupNode.nnHttpAddress, this.conf, bnStorage, NNStorage.NameNodeFile.IMAGE, txid);
        }
        this.getRemoteNamenodeProxy().endCheckpoint(this.backupNode.getRegistration(), sig);
        if (this.backupNode.getRole() == HdfsServerConstants.NamenodeRole.BACKUP) {
            bnImage.convergeJournalSpool();
        }
        this.backupNode.setRegistration();
        long imageSize = bnImage.getStorage().getFsImageName(txid).length();
        LOG.info("Checkpoint completed in " + (Time.monotonicNow() - startTime) / 1000L + " seconds." + " New Image Size: " + imageSize);
    }

    private URL getImageListenAddress() {
        InetSocketAddress httpSocAddr = this.backupNode.getHttpAddress();
        int httpPort = httpSocAddr.getPort();
        try {
            return new URL(DFSUtil.getHttpClientScheme(this.conf) + "://" + this.infoBindAddress + ":" + httpPort);
        }
        catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
    }

    static void rollForwardByApplyingLogs(RemoteEditLogManifest manifest, FSImage dstImage, FSNamesystem dstNamesystem) throws IOException {
        NNStorage dstStorage = dstImage.getStorage();
        ArrayList<EditLogInputStream> editsStreams = Lists.newArrayList();
        for (RemoteEditLog log : manifest.getLogs()) {
            if (log.getEndTxId() <= dstImage.getLastAppliedTxId()) continue;
            File f = dstStorage.findFinalizedEditsFile(log.getStartTxId(), log.getEndTxId());
            editsStreams.add(new EditLogFileInputStream(f, log.getStartTxId(), log.getEndTxId(), true));
        }
        LOG.info("Checkpointer about to load edits from " + editsStreams.size() + " stream(s).");
        dstImage.loadEdits(editsStreams, dstNamesystem);
    }
}

