AdaHandleProcessor.java

package org.cardanofoundation.tools.adahandle.resolver.storage;

import com.bloxbean.cardano.yaci.store.common.domain.AddressUtxo;
import com.bloxbean.cardano.yaci.store.common.domain.Amt;
import com.bloxbean.cardano.yaci.store.events.RollbackEvent;
import com.bloxbean.cardano.yaci.store.events.internal.CommitEvent;
import com.bloxbean.cardano.yaci.store.utxo.domain.AddressUtxoEvent;
import lombok.RequiredArgsConstructor;
import org.cardanofoundation.tools.adahandle.resolver.service.AdaHandleHistoryService;
import org.cardanofoundation.tools.adahandle.resolver.service.AdaHandleService;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Component
@RequiredArgsConstructor
public class AdaHandleProcessor {
    private final AdaHandleService adaHandleService;
    private final AdaHandleHistoryService adaHandleHistoryService;

    private List<SlotAddressUtxo> slotAddressUtxosCache = Collections.synchronizedList(new ArrayList<>());


    @EventListener
    public void processAdaHandle(AddressUtxoEvent addressUtxoEvent) {
        var addressUtxoList = addressUtxoEvent.getTxInputOutputs()
                .stream().flatMap(txInputOutput -> txInputOutput.getOutputs().stream())
                .filter(this::includesAdaHandle).toList();

        if (addressUtxoList.isEmpty()) {
            return;
        }

        var slotAddressUtxo = new SlotAddressUtxo(addressUtxoEvent.getEventMetadata().getSlot(), addressUtxoList);
        slotAddressUtxosCache.add(slotAddressUtxo);
    }

    public boolean includesAdaHandle(AddressUtxo addressUtxo) {
        final String ADA_HANDLE_POLICY_ID = "f0ff48bbb7bbe9d59a40f1ce90e9e9d0ff5002ec48f232b49ca0fb9a";
        List<Amt> amounts = addressUtxo.getAmounts();

        if (amounts != null) {
            for (final Amt amount : amounts) {
                if (amount.getPolicyId() != null && amount.getPolicyId().equals(ADA_HANDLE_POLICY_ID)) {
                    return true;
                }
            }
        }
        return false;
    }

    /**
     * This is invoked once per block batch (100 blocks) at the end of the block processing during initial sync when
     * parallel processing is enabled.
     * <p>
     * After the sync reaches tip, batch size is 1 and this method is invoked for each block.
     */
    @EventListener
    @Transactional
    public void handleCommitEvent(CommitEvent commitEvent) {

        if (slotAddressUtxosCache.isEmpty()) {
            return;
        }

        //Sort slotAddressdUtxosCache by slot in ascending order to process the utxos in the correct order
        slotAddressUtxosCache.sort((o1, o2) -> Long.compare(o1.slot(), o2.slot()));

        try {
            // Process the cached slotAddressUtxos
            for (SlotAddressUtxo slotAddressUtxo : slotAddressUtxosCache) {
                adaHandleService.saveAllAdaHandles(slotAddressUtxo.addressUtxos());
                adaHandleHistoryService.saveAdaHandleHistoryItems(slotAddressUtxo.addressUtxos());
            }

        } finally {
            // Clear the cache after processing
            slotAddressUtxosCache.clear();
        }
    }

    @EventListener
    @Transactional
    public void handleRollback(RollbackEvent rollbackEvent) {
        adaHandleHistoryService.rollbackToSlot(rollbackEvent.getRollbackTo().getSlot());
    }

}

record SlotAddressUtxo(long slot, List<AddressUtxo> addressUtxos) {
}