目录

DPDK - timer

DPDK 提供了 timer 库封装了定时器的实现,如果应用不需要定时器,可以在编译时,通过禁用 rte_timer_manage 来提供性能。

定时器根据每个核心来跟踪,所有待核准的计时器都将被维护,按照跳表(Skip List)中的到期定时器进行排序,其中跳表有 10 级

#define RTE_TIMER_STOP    0 /**< State: timer is stopped. */
#define RTE_TIMER_PENDING 1 /**< State: timer is scheduled. */
#define RTE_TIMER_RUNNING 2 /**< State: timer function is running. */
#define RTE_TIMER_CONFIG  3 /**< State: timer is being configured. */

#define RTE_TIMER_NO_OWNER -2 /**< Timer has no owner. */

在定时器的头文件中,定义了定时器的多个状态:

  • RTE_TIMER_STOP:定时器未启动,或被显示的停止,没有所有者(OWNER),也不被任何逻辑核心(lcore)拥有,也不在跳表中
  • RTE_TIMER_CONFIG:定时器正在被配置,但尚未启动,由一个逻辑核心拥有,但这个状态的定时器不能被其他核心修改,可能存在跳表中,取决于之前的状态
  • RTE_TIMER_PENDING:定时器已经被配置并等待执行,由一个逻辑核心拥有,同样不能被修改,一定在跳表中等待执行
  • RTE_TIMER_RUNNING:定时器正在运行中

摘抄官方给出的定时器用例的主要部分。

首先进行定时器库(环境)初始化、定时器结构的初始化

	static struct rte_timer timer0;

	/* init RTE timer library */
	rte_timer_subsystem_init();
	/* >8 End of init EAL. */

	/* Init timer structures. 8< */
	rte_timer_init(&timer0);
	/* >8 End of init timer structures. */

随后定义定时器的执行周期,并启动定时器

	/* Load timer0, every second, on main lcore, reloaded automatically. 8< */
	hz = rte_get_timer_hz();
	timer_resolution_cycles = hz * 10 / 1000; /* around 10ms */

	lcore_id = rte_lcore_id();
	rte_timer_reset(&timer0, hz, PERIODICAL, lcore_id, timer0_cb, NULL);

最后初始化一个无限循环运行的线程,对定时器校准

	/* Call lcore_mainloop() on every worker lcore. 8< */
	RTE_LCORE_FOREACH_WORKER(lcore_id) {
		rte_eal_remote_launch(lcore_mainloop, NULL, lcore_id);
	}

这里timer0_cb是定时器具体的执行函数,其中arg是定时器调用时,外部传入的额外参数,在定时器内可以使用rte_timer_stop停止该定时器

/* timer0 callback. 8< */
static void
timer0_cb(__rte_unused struct rte_timer *tim,
	  __rte_unused void *arg)
{
	static unsigned counter = 0;
	unsigned lcore_id = rte_lcore_id();

	printf("%s() on lcore %u\n", __func__, lcore_id);

	/* this timer is automatically reloaded until we decide to
	 * stop it, when counter reaches 20. */
	if ((counter ++) == 20)
		rte_timer_stop(tim);
}
/* >8 End of timer0 callback. */

这里每相隔一段时间,都会去执行rte_timer_manage,用于定时器的校准

static __rte_noreturn int
lcore_mainloop(__rte_unused void *arg)
{
	uint64_t prev_tsc = 0, cur_tsc, diff_tsc;
	unsigned lcore_id;

	lcore_id = rte_lcore_id();
	printf("Starting mainloop on core %u\n", lcore_id);

	/* Main loop. 8< */
	while (1) {
		/*
		 * Call the timer handler on each core: as we don't need a
		 * very precise timer, so only call rte_timer_manage()
		 * every ~10ms. In a real application, this will enhance
		 * performances as reading the HPET timer is not efficient.
		 */
		cur_tsc = rte_get_timer_cycles();
		diff_tsc = cur_tsc - prev_tsc;
		if (diff_tsc > timer_resolution_cycles) {
			rte_timer_manage();
			prev_tsc = cur_tsc;
		}
	}
	/* >8 End of main loop. */
}

针对上述用例中,不太好理解的函数进行介绍与解析

  1. rte_timer_subsystem_init
/**
 * Initialize the timer library.
 *
 * Initializes internal variables (list, locks and so on) for the RTE
 * timer library.
 *
 * @note
 *   This function must be called in every process before using the library.
 *
 * @return
 *   - 0: Success
 *   - -ENOMEM: Unable to allocate memory needed to initialize timer
 *      subsystem
 *   - -EALREADY: timer subsystem was already initialized. Not an error.
 */
int rte_timer_subsystem_init(void);

初始化定时器执行的环境,一般指定时器库的内部变量,列表、锁等等

/**
 * Free timer subsystem resources.
 */
void rte_timer_subsystem_finalize(void);

相对的,存在配套的安全释放函数

  1. rte_timer_init
/**
 * Reset and start the timer associated with the timer handle.
 *
 * The rte_timer_reset() function resets and starts the timer
 * associated with the timer handle *tim*. When the timer expires after
 * *ticks* HPET cycles, the function specified by *fct* will be called
 * with the argument *arg* on core *tim_lcore*.
 *
 * If the timer associated with the timer handle is already running
 * (in the RUNNING state), the function will fail. The user has to check
 * the return value of the function to see if there is a chance that the
 * timer is in the RUNNING state.
 *
 * If the timer is being configured on another core (the CONFIG state),
 * it will also fail.
 *
 * If the timer is pending or stopped, it will be rescheduled with the
 * new parameters.
 *
 * @param tim
 *   The timer handle.
 * @param ticks
 *   The number of cycles (see rte_get_hpet_hz()) before the callback
 *   function is called.
 * @param type
 *   The type can be either:
 *   - PERIODICAL: The timer is automatically reloaded after execution
 *     (returns to the PENDING state)
 *   - SINGLE: The timer is one-shot, that is, the timer goes to a
 *     STOPPED state after execution.
 * @param tim_lcore
 *   The ID of the lcore where the timer callback function has to be
 *   executed. If tim_lcore is LCORE_ID_ANY, the timer library will
 *   launch it on a different core for each call (round-robin).
 * @param fct
 *   The callback function of the timer.
 * @param arg
 *   The user argument of the callback function.
 * @return
 *   - 0: Success; the timer is scheduled.
 *   - (-1): Timer is in the RUNNING or CONFIG state.
 */
int rte_timer_reset(struct rte_timer *tim, uint64_t ticks,
		    enum rte_timer_type type, unsigned tim_lcore,
		    rte_timer_cb_t fct, void *arg);

该函数能重置或启动定时器tim实例,启动时将timfct关联起来,当定时器周期执行,并到周期ticks到期时,会在tim_lcore核心上,随着参数arg重新调用fct。 当定时器实例tim在运行中 (in the RUNNING state) 或在另外的核心上被配置 (the CONFIG state),则该函数会调用失败。 定时器分两种运行类型,单次运行(SINGLE)和周期运行(PERIODICAL),对应填入type字段。

  1. rte_timer_manage
/**
 * Manage the timer list and execute callback functions.
 *
 * This function must be called periodically from EAL lcores
 * main_loop(). It browses the list of pending timers and runs all
 * timers that are expired.
 *
 * The precision of the timer depends on the call frequency of this
 * function. However, the more often the function is called, the more
 * CPU resources it will use.
 *
 * @return
 *   - 0: Success
 *   - -EINVAL: timer subsystem not yet initialized
 */
int rte_timer_manage(void);

我们必须周期性的调用该函数,它用于检测所有到期的定时器,并执行它们的回调函数,如果不调用它,则定时器不会运行。并且调用的周期,决定了定时器的精度,它是 DPDK 定时器机制中不可或缺的一部分

使用定时器来定义周期执行时,DPDK 提供了一套用于换算时间的函数,位于rte_cycles.h头文件中。 首先了解一些基础概念:

  1. hz:表示每秒的周期数,是频率的单位
  2. cycles:在一个给定的时间段内发生的周期数,是 CPU 执行操作的基本事件单位
  3. tick:操作系统中,tick 表示时钟中断的频率,即每秒产生的中断次数,故时钟频率为 F Hz,则每秒的 tick 数量就是 F,每个 tick 的时间是 (1 / F) 秒。可以调整 HZ 宏来改变 tick 的频率

在 DPDK 中,我们通常利用 hz 和 cycles 来换算出时间,应用在定时器上,例如:

	hz = rte_get_timer_hz();
	timer_resolution_cycles = hz * 10 / 1000; /* around 10ms */

由于 hz 表示每秒的周期,则能换算出timer_resolution_cycles是执行 10ms 所需要的周期

  1. rte_get_timer_hz
/**
 * Get the number of cycles in one second for the default timer.
 *
 * @return
 *   The number of cycles in one second.
 */
static inline uint64_t rte_get_timer_hz(void) {...}
  1. rte_get_timer_cycles
/**
 * Get the number of cycles since boot from the default timer.
 *
 * @return
 *   The number of cycles
 */
static inline uint64_t rte_get_timer_cycles(void) {...}

这里的 cycles 是从计算机启动时开始计算,故而只能换算出相对时间。

  1. tsc && hpet
uint64_t rte_get_tsc_hz(void);
static inline uint64_t rte_get_tsc_cycles(void);
<br/>
uint64_t rte_get_hpet_hz(void);
uint64_t rte_get_hpet_cycles(void);

在实际使用中,调用1. 和 2. 上述的两个函数即可获得 hz 和 cycles 来计算时间,但其内部可以使用 TSC 和 HPET 两种方式获取。在编译 DPDK 时,能选择其中一种作为时间源,若 HPET 被允许使用,则会优先使用 HPET,否则会回退使用 TSC。 其中 HPET 是高精度事件计时器,是一种硬件计时器,提供了比 TSC 更高的精度和稳定性,而 TSC 是 CPU 提供的单调递增计数器,其值是 EAL 初始化时测量并设置的。

计算机的时间定义