在去中心化金融(DeFi)领域,实时监控借贷协议中借款人的健康状况至关重要。本文将详细介绍如何利用流处理技术与无服务器函数,构建一个自动化系统来追踪 Aave V3 协议上借款人的健康因子变化。
概述
区块链项目的服务器端基础设施管理往往复杂且耗时。无服务器函数提供了一种高效的解决方案,使开发者能够专注于业务逻辑而非底层架构。通过结合流处理技术,我们可以实时捕获链上事件并触发自定义处理逻辑,无需自行维护监听节点或扩展服务器资源。
在本方案中,我们将通过以下步骤实现监控功能:
- 设置数据流以捕获 Aave V3 资金池合约的借款事件
- 开发处理函数来提取借款人地址并查询实时健康因子
- 将关键数据存储在键值存储系统中供后续查询
- 通过 Webhook 接收实时通知并验证系统运行状态
核心概念解析
流处理技术
流处理技术提供来自多条区块链的实时和历史区块数据。它支持灵活的数据路由选项,可将数据发送到 Webhooks、云存储桶、数据库等多种目的地。该技术提供可自定义的数据架构,允许在数据传输前进行过滤和转换,从而实现精确的数据解析,同时由平台管理底层的 RPC 基础设施。
无服务器函数
无服务器函数解决了区块链开发者面临的一个普遍挑战:管理和扩展实时监听区块链的服务器端脚本。使用函数即服务(FaaS),开发者可以专注于编写代码,而不必担心基础设施的维护。
函数可以使用 JavaScript 或 Python 创建,并支持外部库如各种 Web3 SDK。主要特点包括:
- API 就绪:函数自动转换为 API,可直接从前端或其他服务调用
- 区块链优化:与流处理系统无缝协作,在新区块链数据到达时激活
- 存储集成:轻松访问和管理函数中的存储数据
- 成本效益:按使用量付费,无前期费用或长期承诺
- 弹性扩展:在全球负载均衡、自动扩展的基础设施上运行
键值存储 API
键值存储 API 为无状态函数提供了状态管理能力。通过在函数中存储和检索键值对数据,开发者可以构建更复杂的应用逻辑。不仅可以通过函数本身访问这些值,还能通过 REST API 进行操作,使其可以从任何应用程序调用。
Aave 协议与健康因子机制
Aave 是一个知名的抵押借贷协议,允许用户存款、借款和参与清算过程。当借款人的健康因子低于 1 时,其头寸就面临被清算的风险。
健康因子是衡量借款人偿还能力的关键指标,计算公式基于抵押资产价值与借款金额的比率。要监控头寸的健康状况,可以调用 Aave V3 资金池合约的 getUserAccountData() 函数,该函数返回账户数据,包括在所有借贷资产上的健康因子。
我们的监控系统将实现以下功能:
- 在键值存储中初始化借款人列表
- 处理来自借款事件的借款人地址
- 创建(借款人地址,健康因子)对集合,跟踪每个借款人的当前头寸状况
- 将这些信息存储在键值存储中
- 通过 REST API 调用键值存储来检索数据
构建监控系统
准备工作
在开始构建前,需要准备以下资源:
- 一个可用的区块链节点服务账号
- 对 JavaScript 和以太坊交易及事件的基本理解
- 以太坊主网的 RPC 端点
- 一个可接收数据的 Webhook URL
创建流处理器
首先需要设置流处理器来捕获 Aave V3 合约的实时借款事件。流处理器配置如下:
- 区块链:以太坊
- 网络:主网
- 数据集:交易数据
- 流起始点:最新区块(可根据需要调整)
- 流负载处理:在流式传输前修改数据
- 重组处理:使用默认设置
流处理器的核心是一个过滤函数,它识别 Aave V3 资金池合约上的借款操作:
// 流处理函数示例代码
function main(stream) {
try {
const data = stream.data;
var transactions = data[0];
const AAVE_V3_POOL_ADDRESS = '0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2'.toLowerCase();
const BORROW_FUNCTION_SIGNATURE = '0xa415bcad'; // borrow() 的函数签名
// 过滤交易
var filteredList = transactions.filter(tx => {
return tx.to &&
tx.to.toLowerCase() === AAVE_V3_POOL_ADDRESS &&
tx.input.startsWith(BORROW_FUNCTION_SIGNATURE);
});
// 提取借款人地址
var borrowers = filteredList.map(tx => {
return {
block: parseInt(tx.blockNumber, 16),
transactionHash: tx.hash,
borrower: tx.from,
};
});
if (borrowers.length === 0) {
return {
block: parseInt(transactions[0].blockNumber, 16),
message: "在该区块中未找到借款交易"
};
}
return { borrowers };
} catch (e) {
return { error: e.message };
}
}此函数分析每个区块中的交易,筛选出发送到 Aave V3 资金池地址且包含借款函数签名的交易。对于每个符合条件的交易,它提取区块号、交易哈希和借款人地址(即交易的发送方)。
创建处理函数
接下来创建无服务器函数来处理流处理器捕获的事件:
// 函数处理示例代码
const https = require('https');
const ethers = require('ethers');
// 常量和配置
const AAVE_V3_POOL_ADDRESS = '0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2';
const RPC_URL = "你的_RPC_URL";
const WEBHOOK_TOKEN = "你的_WEBHOOK_URL";
// Aave V3 资金池ABI片段
const AAVE_V3_POOL_ABI = [
{
"inputs": [{"internalType": "address", "name": "user", "type": "address"}],
"name": "getUserAccountData",
"outputs": [
{"internalType": "uint256", "name": "totalCollateralBase", "type": "uint256"},
{"internalType": "uint256", "name": "totalDebtBase", "type": "uint256"},
{"internalType": "uint256", "name": "availableBorrowsBase", "type": "uint256"},
{"internalType": "uint256", "name": "currentLiquidationThreshold", "type": "uint256"},
{"internalType": "uint256", "name": "ltv", "type": "uint256"},
{"internalType": "uint256", "name": "healthFactor", "type": "uint256"}
],
"stateMutability": "view",
"type": "function"
}
];
let provider;
let aavePool;
// 初始化合约实例
function initializeContract() {
provider = new ethers.JsonRpcProvider(RPC_URL);
aavePool = new ethers.Contract(AAVE_V3_POOL_ADDRESS, AAVE_V3_POOL_ABI, provider);
}
// 获取健康因子
async function convertToHealthFactor(userAddress) {
try {
const accountData = await aavePool.getUserAccountData(userAddress);
return ethers.formatUnits(accountData.healthFactor, 18);
} catch (error) {
console.error(`获取 ${userAddress} 健康因素时出错:`, error);
return null;
}
}
// 主处理函数
async function main(params) {
console.log("收到的参数:", JSON.stringify(params, null, 2));
try {
initializeContract();
const data = params.data || params;
let result = {};
// 处理不同的数据格式
if (data.transactions && Array.isArray(data.transactions)) {
// 处理交易数据
const borrowers = data.transactions
.filter(tx => tx.to === AAVE_V3_POOL_ADDRESS)
.map(tx => tx.from)
.filter(address => address !== null && address !== undefined);
if (borrowers.length > 0) {
const processedBorrowers = [];
for (const borrower of borrowers) {
if (typeof borrower !== 'string') {
console.error(`无效的借款人地址: ${borrower}`);
continue;
}
await qnLib.qnAddListItem(`borrowers-${AAVE_V3_POOL_ADDRESS}`, borrower);
try {
const healthFactor = await convertToHealthFactor(borrower);
await qnLib.qnAddSet(`borrower-${borrower}`, healthFactor);
processedBorrowers.push({ borrower, healthFactor });
} catch (error) {
console.error(`处理借款人 ${borrower} 时出错:`, error);
processedBorrowers.push({ borrower, error: error.message });
}
}
result = {
message: `处理了来自区块 ${data.number} 的 ${borrowers.length} 个借款人`,
borrowers: processedBorrowers
};
} else {
result = {
message: `在区块 ${data.number} 中未找到有效借款交易`,
block: data.number
};
}
} else if (data.borrowers && Array.isArray(data.borrowers) && data.borrowers.length > 0) {
// 处理借款人数据
const processedBorrowers = [];
for (const borrowerObj of data.borrowers) {
const borrower = borrowerObj.borrower;
if (typeof borrower !== 'string') {
console.error(`无效的借款人地址: ${borrower}`);
continue;
}
await qnLib.qnAddListItem(`borrowers-${AAVE_V3_POOL_ADDRESS}`, borrower);
try {
const healthFactor = await convertToHealthFactor(borrower);
await qnLib.qnAddSet(`borrower-${borrower}`, healthFactor);
processedBorrowers.push({ borrower, healthFactor });
} catch (error) {
console.error(`处理借款人 ${borrower} 时出错:`, error);
processedBorrowers.push({ borrower, error: error.message });
}
}
result = {
message: `处理了 ${processedBorrowers.length} 个借款人`,
borrowers: processedBorrowers
};
} else if (data.block && data.message && data.message.includes("没有找到借款交易")) {
result = {
message: `在区块 ${data.block} 中未找到借款交易`,
block: data.block
};
} else {
result = {
message: "接收到的无效数据格式",
receivedData: data
};
}
console.log("结果:", JSON.stringify(result, null, 2));
await sendToWebhook(result);
return result;
} catch (error) {
console.error("主函数出错:", error);
return {
message: "处理请求时出错",
error: error.message
};
}
}
// 发送数据到Webhook
async function sendToWebhook(result) {
const payload = JSON.stringify(result);
const options = {
hostname: 'typedwebhook.tools',
path: `/webhook/${WEBHOOK_TOKEN}`,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': payload.length
}
};
return new Promise((resolve, reject) => {
const req = https.request(options, res => {
let data = '';
res.on('data', chunk => { data += chunk; });
res.on('end', () => {
console.log('Webhook 响应:', data);
resolve(data);
});
});
req.on('error', error => {
console.error('发送到 webhook 时出错:', error);
reject(error);
});
req.write(payload);
req.end();
});
}
module.exports = { main };此函数主要完成三个任务:
- 计算借款人的健康因子
- 将借款人信息(地址和健康因子)存储在键值存储中
- 将结果报告发送到 Webhook
系统测试与验证
完成流处理器和函数的设置后,需要等待借款事件发生以测试系统。一旦有借款事件发生,Webhook 将接收到相关数据。
要验证键值存储中的数据,可以通过 REST API 调用检索信息:
# 获取所有借款人列表
curl -X GET \
"https://api.quicknode.com/kv/rest/v1/lists/borrowers-0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
-H "x-api-key: YOUR_API_KEY"
# 获取特定借款人的健康因子
curl -X GET \
"https://api.quicknode.com/kv/rest/v1/sets/borrower-0x借款人地址" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
-H "x-api-key: YOUR_API_KEY"响应将包含借款人地址列表或特定借款人的健康因子值。
进阶应用与扩展
基于此监控系统,可以进一步扩展以下功能:
- 历史区块高度记录:记录借款人在特定区块时的健康评分,便于历史数据分析
- 阈值警报系统:当借款人的健康评分低于设定阈值时发送通知
- 多协议支持:扩展系统以支持其他借贷协议的健康因子监控
- 数据可视化:将收集的数据通过仪表板展示,提供更直观的监控体验
常见问题
什么是健康因子?
健康因子是 Aave 协议中衡量借款人偿还能力的关键指标。它基于抵押资产价值与借款金额的比率计算,当健康因子低于 1 时,头寸可能被清算。
为什么需要监控 Aave 借款人的健康因子?
监控健康因子可以帮助识别可能被清算的头寸,为风险管理和投资决策提供数据支持。对于借款人而言,及时了解自己的健康因子可以避免意外清算。
流处理技术与传统轮询方式有何优势?
流处理技术提供实时数据推送,减少延迟和资源消耗。与传统轮询相比,它更高效且能够及时响应链上事件,无需不断查询区块链状态。
如何确保监控系统的可靠性?
通过多节点冗余、错误重试机制和实时警报系统可以提高监控系统的可靠性。定期检查流处理器和函数的运行状态也很重要。
这个系统可以监控其他 DeFi 协议吗?
可以,通过调整合约地址和 ABI 定义,该系统可以适配其他类似的 DeFi 借贷协议,实现多协议监控功能。
数据处理和存储是否符合隐私保护要求?
该系统只存储公开的链上数据,不涉及用户隐私信息。所有数据都是区块链上公开可查的信息,符合去中心化应用的透明性原则。
总结
本文详细介绍了如何使用流处理技术和无服务器函数构建 Aave V3 借款人的健康因子监控系统。通过结合实时数据流处理、链上数据查询和键值存储技术,我们建立了一个高效、可扩展的监控解决方案。
这种方案的优势在于其实时性和可扩展性,能够处理大量链上数据并及时响应市场变化。利用无服务器架构,开发者可以专注于业务逻辑而不必担心基础设施管理,大大降低了开发和维护成本。
未来,随着区块链技术的发展和多链生态的成熟,此类监控系统可以进一步扩展到更多区块链网络和 DeFi 协议,为去中心化金融领域提供更全面的风险管理工具。