@ arch/arm/Kconfig:44 @ config ARM
 	select ARCH_SUPPORTS_CFI_CLANG
 	select ARCH_SUPPORTS_HUGETLBFS if ARM_LPAE
 	select ARCH_SUPPORTS_PER_VMA_LOCK
+	select ARCH_SUPPORTS_RT if HAVE_POSIX_CPU_TIMERS_TASK_WORK
 	select ARCH_USE_BUILTIN_BSWAP
 	select ARCH_USE_CMPXCHG_LOCKREF
 	select ARCH_USE_MEMTEST
@ arch/arm/Kconfig:84 @ config ARM
 	select HAS_IOPORT
 	select HAVE_ARCH_AUDITSYSCALL if AEABI && !OABI_COMPAT
 	select HAVE_ARCH_BITREVERSE if (CPU_32v7M || CPU_32v7) && !CPU_32v6
-	select HAVE_ARCH_JUMP_LABEL if !XIP_KERNEL && !CPU_ENDIAN_BE32 && MMU
+	select HAVE_ARCH_JUMP_LABEL if !XIP_KERNEL && !CPU_ENDIAN_BE32 && MMU && !PREEMPT_RT
 	select HAVE_ARCH_KFENCE if MMU && !XIP_KERNEL
 	select HAVE_ARCH_KGDB if !CPU_ENDIAN_BE32 && MMU
 	select HAVE_ARCH_KASAN if MMU && !XIP_KERNEL
@ arch/arm/Kconfig:109 @ config ARM
 	select HAVE_DYNAMIC_FTRACE_WITH_REGS if HAVE_DYNAMIC_FTRACE
 	select HAVE_EFFICIENT_UNALIGNED_ACCESS if (CPU_V6 || CPU_V6K || CPU_V7) && MMU
 	select HAVE_EXIT_THREAD
-	select HAVE_GUP_FAST if ARM_LPAE
+	select HAVE_GUP_FAST if ARM_LPAE && !(PREEMPT_RT && HIGHPTE)
 	select HAVE_FUNCTION_ERROR_INJECTION
 	select HAVE_FUNCTION_GRAPH_TRACER
 	select HAVE_FUNCTION_TRACER if !XIP_KERNEL
@ arch/arm/Kconfig:132 @ config ARM
 	select HAVE_PERF_EVENTS
 	select HAVE_PERF_REGS
 	select HAVE_PERF_USER_STACK_DUMP
+	select HAVE_POSIX_CPU_TIMERS_TASK_WORK if !KVM
 	select MMU_GATHER_RCU_TABLE_FREE if SMP && ARM_LPAE
 	select HAVE_REGS_AND_STACK_ACCESS_API
 	select HAVE_RSEQ
@ arch/arm/mm/fault.c:477 @ do_translation_fault(unsigned long addr, unsigned int fsr,
 	if (addr < TASK_SIZE)
 		return do_page_fault(addr, fsr, regs);
 
+	if (interrupts_enabled(regs))
+		local_irq_enable();
+
 	if (user_mode(regs))
 		goto bad_area;
 
@ arch/arm/mm/fault.c:550 @ do_translation_fault(unsigned long addr, unsigned int fsr,
 static int
 do_sect_fault(unsigned long addr, unsigned int fsr, struct pt_regs *regs)
 {
+	if (interrupts_enabled(regs))
+		local_irq_enable();
+
 	do_bad_area(addr, fsr, regs);
 	return 0;
 }
@ arch/powerpc/Kconfig:173 @ config PPC
 	select ARCH_STACKWALK
 	select ARCH_SUPPORTS_ATOMIC_RMW
 	select ARCH_SUPPORTS_DEBUG_PAGEALLOC	if PPC_BOOK3S || PPC_8xx
+	select ARCH_SUPPORTS_RT			if HAVE_POSIX_CPU_TIMERS_TASK_WORK
 	select ARCH_USE_BUILTIN_BSWAP
 	select ARCH_USE_CMPXCHG_LOCKREF		if PPC64
 	select ARCH_USE_MEMTEST
@ arch/powerpc/Kconfig:277 @ config PPC
 	select HAVE_PERF_EVENTS_NMI		if PPC64
 	select HAVE_PERF_REGS
 	select HAVE_PERF_USER_STACK_DUMP
+	select HAVE_POSIX_CPU_TIMERS_TASK_WORK	if !KVM
 	select HAVE_PREEMPT_DYNAMIC_KEY
 	select HAVE_RETHOOK			if KPROBES
 	select HAVE_REGS_AND_STACK_ACCESS_API
@ arch/powerpc/include/asm/stackprotector.h:22 @
  */
 static __always_inline void boot_init_stack_canary(void)
 {
-	unsigned long canary = get_random_canary();
+	unsigned long canary;
 
+#ifndef CONFIG_PREEMPT_RT
+	canary = get_random_canary();
+#else
+	canary = ((unsigned long)&canary) & CANARY_MASK;
+#endif
 	current->stack_canary = canary;
 #ifdef CONFIG_PPC64
 	get_paca()->canary = canary;
@ arch/powerpc/kvm/Kconfig:237 @ config KVM_E500MC
 config KVM_MPIC
 	bool "KVM in-kernel MPIC emulation"
 	depends on KVM && PPC_E500
+	depends on !PREEMPT_RT
 	select HAVE_KVM_IRQCHIP
 	select HAVE_KVM_IRQ_ROUTING
 	select HAVE_KVM_MSI
@ arch/powerpc/platforms/pseries/Kconfig:5 @
 config PPC_PSERIES
 	depends on PPC64 && PPC_BOOK3S
 	bool "IBM pSeries & new (POWER5-based) iSeries"
+	select GENERIC_ALLOCATOR
 	select HAVE_PCSPKR_PLATFORM
 	select MPIC
 	select OF_DYNAMIC
@ arch/powerpc/platforms/pseries/iommu.c:29 @
 #include <linux/of_address.h>
 #include <linux/iommu.h>
 #include <linux/rculist.h>
+#include <linux/local_lock.h>
 #include <asm/io.h>
 #include <asm/prom.h>
 #include <asm/rtas.h>
@ arch/powerpc/platforms/pseries/iommu.c:249 @ static int tce_build_pSeriesLP(unsigned long liobn, long tcenum, long tceshift,
 	return ret;
 }
 
-static DEFINE_PER_CPU(__be64 *, tce_page);
+struct tce_page {
+	__be64 * page;
+	local_lock_t lock;
+};
+static DEFINE_PER_CPU(struct tce_page, tce_page) = {
+	.lock = INIT_LOCAL_LOCK(lock),
+};
 
 static int tce_buildmulti_pSeriesLP(struct iommu_table *tbl, long tcenum,
 				     long npages, unsigned long uaddr,
@ arch/powerpc/platforms/pseries/iommu.c:278 @ static int tce_buildmulti_pSeriesLP(struct iommu_table *tbl, long tcenum,
 		                           direction, attrs);
 	}
 
-	local_irq_save(flags);	/* to protect tcep and the page behind it */
+	/* to protect tcep and the page behind it */
+	local_lock_irqsave(&tce_page.lock, flags);
 
-	tcep = __this_cpu_read(tce_page);
+	tcep = __this_cpu_read(tce_page.page);
 
 	/* This is safe to do since interrupts are off when we're called
 	 * from iommu_alloc{,_sg}()
@ arch/powerpc/platforms/pseries/iommu.c:290 @ static int tce_buildmulti_pSeriesLP(struct iommu_table *tbl, long tcenum,
 		tcep = (__be64 *)__get_free_page(GFP_ATOMIC);
 		/* If allocation fails, fall back to the loop implementation */
 		if (!tcep) {
-			local_irq_restore(flags);
+			local_unlock_irqrestore(&tce_page.lock, flags);
 			return tce_build_pSeriesLP(tbl->it_index, tcenum,
 					tceshift,
 					npages, uaddr, direction, attrs);
 		}
-		__this_cpu_write(tce_page, tcep);
+		__this_cpu_write(tce_page.page, tcep);
 	}
 
 	rpn = __pa(uaddr) >> tceshift;
@ arch/powerpc/platforms/pseries/iommu.c:325 @ static int tce_buildmulti_pSeriesLP(struct iommu_table *tbl, long tcenum,
 		tcenum += limit;
 	} while (npages > 0 && !rc);
 
-	local_irq_restore(flags);
+	local_unlock_irqrestore(&tce_page.lock, flags);
 
 	if (unlikely(rc == H_NOT_ENOUGH_RESOURCES)) {
 		ret = (int)rc;
@ arch/powerpc/platforms/pseries/iommu.c:509 @ static int tce_setrange_multi_pSeriesLP(unsigned long start_pfn,
 				DMA_BIDIRECTIONAL, 0);
 	}
 
-	local_irq_disable();	/* to protect tcep and the page behind it */
-	tcep = __this_cpu_read(tce_page);
+	/* to protect tcep and the page behind it */
+	local_lock_irq(&tce_page.lock);
+	tcep = __this_cpu_read(tce_page.page);
 
 	if (!tcep) {
 		tcep = (__be64 *)__get_free_page(GFP_ATOMIC);
 		if (!tcep) {
-			local_irq_enable();
+			local_unlock_irq(&tce_page.lock);
 			return -ENOMEM;
 		}
-		__this_cpu_write(tce_page, tcep);
+		__this_cpu_write(tce_page.page, tcep);
 	}
 
 	proto_tce = TCE_PCI_READ | TCE_PCI_WRITE;
@ arch/powerpc/platforms/pseries/iommu.c:562 @ static int tce_setrange_multi_pSeriesLP(unsigned long start_pfn,
 
 	/* error cleanup: caller will clear whole range */
 
-	local_irq_enable();
+	local_unlock_irq(&tce_page.lock);
 	return rc;
 }
 
@ drivers/gpu/drm/i915/Kconfig:6 @ config DRM_I915
 	tristate "Intel 8xx/9xx/G3x/G4x/HD Graphics"
 	depends on DRM
 	depends on X86 && PCI
-	depends on !PREEMPT_RT
 	select INTEL_GTT if X86
 	select INTERVAL_TREE
 	# we need shmfs for the swappable backing store, and in particular
@ drivers/gpu/drm/i915/display/intel_crtc.c:565 @ void intel_pipe_update_start(struct intel_atomic_state *state,
 	 */
 	intel_psr_wait_for_idle_locked(new_crtc_state);
 
-	local_irq_disable();
+	if (!IS_ENABLED(CONFIG_PREEMPT_RT))
+		local_irq_disable();
 
 	crtc->debug.min_vbl = evade.min;
 	crtc->debug.max_vbl = evade.max;
@ drivers/gpu/drm/i915/display/intel_crtc.c:584 @ void intel_pipe_update_start(struct intel_atomic_state *state,
 	return;
 
 irq_disable:
-	local_irq_disable();
+	if (!IS_ENABLED(CONFIG_PREEMPT_RT))
+		local_irq_disable();
 }
 
 #if IS_ENABLED(CONFIG_DRM_I915_DEBUG_VBLANK_EVADE)
@ drivers/gpu/drm/i915/display/intel_crtc.c:731 @ void intel_pipe_update_end(struct intel_atomic_state *state,
 	if (!state->base.legacy_cursor_update)
 		intel_vrr_send_push(NULL, new_crtc_state);
 
-	local_irq_enable();
+	if (!IS_ENABLED(CONFIG_PREEMPT_RT))
+		local_irq_enable();
 
 	if (intel_vgpu_active(dev_priv))
 		goto out;
@ drivers/gpu/drm/i915/display/intel_cursor.c:932 @ intel_legacy_cursor_update(struct drm_plane *_plane,
 		 */
 		intel_psr_wait_for_idle_locked(crtc_state);
 
-		local_irq_disable();
+		if (!IS_ENABLED(CONFIG_PREEMPT_RT))
+			local_irq_disable();
 
 		intel_vblank_evade(&evade);
 
 		drm_crtc_vblank_put(&crtc->base);
 	} else {
-		local_irq_disable();
+		if (!IS_ENABLED(CONFIG_PREEMPT_RT))
+			local_irq_disable();
 	}
 
 	if (new_plane_state->uapi.visible) {
@ drivers/gpu/drm/i915/display/intel_cursor.c:950 @ intel_legacy_cursor_update(struct drm_plane *_plane,
 		intel_plane_disable_arm(NULL, plane, crtc_state);
 	}
 
-	local_irq_enable();
+	if (!IS_ENABLED(CONFIG_PREEMPT_RT))
+		local_irq_enable();
 
 	intel_psr_unlock(crtc_state);
 
@ drivers/gpu/drm/i915/display/intel_display_trace.h:16 @
 #if !defined(__INTEL_DISPLAY_TRACE_H__) || defined(TRACE_HEADER_MULTI_READ)
 #define __INTEL_DISPLAY_TRACE_H__
 
+#if defined(CONFIG_PREEMPT_RT) && !defined(NOTRACE)
+#define NOTRACE
+#endif
+
 #include <linux/string.h>
 #include <linux/string_helpers.h>
 #include <linux/types.h>
@ drivers/gpu/drm/i915/display/intel_vblank.c:315 @ static void intel_vblank_section_exit(struct intel_display *display)
 	struct drm_i915_private *i915 = to_i915(display->drm);
 	spin_unlock(&i915->uncore.lock);
 }
+
+static void intel_vblank_section_enter_irqf(struct intel_display *display, unsigned long *flags)
+	__acquires(i915->uncore.lock)
+{
+	struct drm_i915_private *i915 = to_i915(display->drm);
+	spin_lock_irqsave(&i915->uncore.lock, *flags);
+}
+
+static void intel_vblank_section_exit_irqf(struct intel_display *display, unsigned long flags)
+	__releases(i915->uncore.lock)
+{
+	struct drm_i915_private *i915 = to_i915(display->drm);
+	spin_unlock_irqrestore(&i915->uncore.lock, flags);
+}
 #else
 static void intel_vblank_section_enter(struct intel_display *display)
 {
@ drivers/gpu/drm/i915/display/intel_vblank.c:337 @ static void intel_vblank_section_enter(struct intel_display *display)
 static void intel_vblank_section_exit(struct intel_display *display)
 {
 }
+
+static void intel_vblank_section_enter_irqf(struct intel_display *display, unsigned long *flags)
+{
+	*flags = 0;
+}
+
+static void intel_vblank_section_exit_irqf(struct intel_display *display, unsigned long flags)
+{
+	if (flags)
+		return;
+}
 #endif
 
 static bool i915_get_crtc_scanoutpos(struct drm_crtc *_crtc,
@ drivers/gpu/drm/i915/display/intel_vblank.c:384 @ static bool i915_get_crtc_scanoutpos(struct drm_crtc *_crtc,
 	 * timing critical raw register reads, potentially with
 	 * preemption disabled, so the following code must not block.
 	 */
-	local_irq_save(irqflags);
-	intel_vblank_section_enter(display);
+	intel_vblank_section_enter_irqf(display, &irqflags);
 
-	/* preempt_disable_rt() should go right here in PREEMPT_RT patchset. */
+	if (IS_ENABLED(CONFIG_PREEMPT_RT))
+		preempt_disable();
 
 	/* Get optional system timestamp before query. */
 	if (stime)
@ drivers/gpu/drm/i915/display/intel_vblank.c:451 @ static bool i915_get_crtc_scanoutpos(struct drm_crtc *_crtc,
 	if (etime)
 		*etime = ktime_get();
 
-	/* preempt_enable_rt() should go right here in PREEMPT_RT patchset. */
+	if (IS_ENABLED(CONFIG_PREEMPT_RT))
+		preempt_enable();
 
-	intel_vblank_section_exit(display);
-	local_irq_restore(irqflags);
+	intel_vblank_section_exit_irqf(display, irqflags);
 
 	/*
 	 * While in vblank, position will be negative
@ drivers/gpu/drm/i915/display/intel_vblank.c:492 @ int intel_get_crtc_scanline(struct intel_crtc *crtc)
 	unsigned long irqflags;
 	int position;
 
-	local_irq_save(irqflags);
-	intel_vblank_section_enter(display);
+	intel_vblank_section_enter_irqf(display, &irqflags);
 
 	position = __intel_get_crtc_scanline(crtc);
 
-	intel_vblank_section_exit(display);
-	local_irq_restore(irqflags);
+	intel_vblank_section_exit_irqf(display, irqflags);
 
 	return position;
 }
@ drivers/gpu/drm/i915/display/intel_vblank.c:756 @ int intel_vblank_evade(struct intel_vblank_evade_ctx *evade)
 			break;
 		}
 
-		local_irq_enable();
+		if (!IS_ENABLED(CONFIG_PREEMPT_RT))
+			local_irq_enable();
 
 		timeout = schedule_timeout(timeout);
 
-		local_irq_disable();
+		if (!IS_ENABLED(CONFIG_PREEMPT_RT))
+			local_irq_disable();
 	}
 
 	finish_wait(wq, &wait);
@ drivers/gpu/drm/i915/gt/intel_engine_cs.c:1612 @ u64 intel_engine_get_last_batch_head(const struct intel_engine_cs *engine)
 
 static unsigned long stop_timeout(const struct intel_engine_cs *engine)
 {
-	if (in_atomic() || irqs_disabled()) /* inside atomic preempt-reset? */
+	if (in_atomic() || irqs_disabled() || rcu_preempt_depth()) /* inside atomic preempt-reset? */
 		return 0;
 
 	/*
@ drivers/gpu/drm/i915/gt/intel_execlists_submission.c:1297 @ static void execlists_dequeue(struct intel_engine_cs *engine)
 	 * and context switches) submission.
 	 */
 
-	spin_lock(&sched_engine->lock);
+	spin_lock_irq(&sched_engine->lock);
 
 	/*
 	 * If the queue is higher priority than the last
@ drivers/gpu/drm/i915/gt/intel_execlists_submission.c:1397 @ static void execlists_dequeue(struct intel_engine_cs *engine)
 				 * Even if ELSP[1] is occupied and not worthy
 				 * of timeslices, our queue might be.
 				 */
-				spin_unlock(&sched_engine->lock);
+				spin_unlock_irq(&sched_engine->lock);
 				return;
 			}
 		}
@ drivers/gpu/drm/i915/gt/intel_execlists_submission.c:1423 @ static void execlists_dequeue(struct intel_engine_cs *engine)
 
 		if (last && !can_merge_rq(last, rq)) {
 			spin_unlock(&ve->base.sched_engine->lock);
-			spin_unlock(&engine->sched_engine->lock);
+			spin_unlock_irq(&engine->sched_engine->lock);
 			return; /* leave this for another sibling */
 		}
 
@ drivers/gpu/drm/i915/gt/intel_execlists_submission.c:1585 @ static void execlists_dequeue(struct intel_engine_cs *engine)
 	 */
 	sched_engine->queue_priority_hint = queue_prio(sched_engine);
 	i915_sched_engine_reset_on_empty(sched_engine);
-	spin_unlock(&sched_engine->lock);
+	spin_unlock_irq(&sched_engine->lock);
 
 	/*
 	 * We can skip poking the HW if we ended up with exactly the same set
@ drivers/gpu/drm/i915/gt/intel_execlists_submission.c:1611 @ static void execlists_dequeue(struct intel_engine_cs *engine)
 	}
 }
 
-static void execlists_dequeue_irq(struct intel_engine_cs *engine)
-{
-	local_irq_disable(); /* Suspend interrupts across request submission */
-	execlists_dequeue(engine);
-	local_irq_enable(); /* flush irq_work (e.g. breadcrumb enabling) */
-}
-
 static void clear_ports(struct i915_request **ports, int count)
 {
 	memset_p((void **)ports, NULL, count);
@ drivers/gpu/drm/i915/gt/intel_execlists_submission.c:2465 @ static void execlists_submission_tasklet(struct tasklet_struct *t)
 	}
 
 	if (!engine->execlists.pending[0]) {
-		execlists_dequeue_irq(engine);
+		execlists_dequeue(engine);
 		start_timeslice(engine);
 	}
 
@ drivers/gpu/drm/i915/gt/uc/intel_guc.h:365 @ static inline int intel_guc_send_busy_loop(struct intel_guc *guc,
 {
 	int err;
 	unsigned int sleep_period_ms = 1;
-	bool not_atomic = !in_atomic() && !irqs_disabled();
+	bool not_atomic = !in_atomic() && !irqs_disabled() && !rcu_preempt_depth();
 
 	/*
 	 * FIXME: Have caller pass in if we are in an atomic context to avoid
@ drivers/gpu/drm/i915/i915_request.c:611 @ bool __i915_request_submit(struct i915_request *request)
 
 	RQ_TRACE(request, "\n");
 
-	GEM_BUG_ON(!irqs_disabled());
 	lockdep_assert_held(&engine->sched_engine->lock);
 
 	/*
@ drivers/gpu/drm/i915/i915_request.c:719 @ void __i915_request_unsubmit(struct i915_request *request)
 	 */
 	RQ_TRACE(request, "\n");
 
-	GEM_BUG_ON(!irqs_disabled());
 	lockdep_assert_held(&engine->sched_engine->lock);
 
 	/*
@ drivers/gpu/drm/i915/i915_trace.h:9 @
 #if !defined(_I915_TRACE_H_) || defined(TRACE_HEADER_MULTI_READ)
 #define _I915_TRACE_H_
 
+#if defined(CONFIG_PREEMPT_RT) && !defined(NOTRACE)
+#define NOTRACE
+#endif
+
 #include <linux/stringify.h>
 #include <linux/types.h>
 #include <linux/tracepoint.h>
@ drivers/gpu/drm/i915/i915_utils.h:270 @ wait_remaining_ms_from_jiffies(unsigned long timestamp_jiffies, int to_wait_ms)
 						   (Wmax))
 #define wait_for(COND, MS)		_wait_for((COND), (MS) * 1000, 10, 1000)
 
-/* If CONFIG_PREEMPT_COUNT is disabled, in_atomic() always reports false. */
-#if IS_ENABLED(CONFIG_DRM_I915_DEBUG) && IS_ENABLED(CONFIG_PREEMPT_COUNT)
+/*
+ * If CONFIG_PREEMPT_COUNT is disabled, in_atomic() always reports false.
+ * On PREEMPT_RT the context isn't becoming atomic because it is used in an
+ * interrupt handler or because a spinlock_t is acquired. This leads to
+ * warnings which don't occur otherwise and therefore the check is disabled.
+ */
+#if IS_ENABLED(CONFIG_DRM_I915_DEBUG) && IS_ENABLED(CONFIG_PREEMPT_COUNT) && !defined(CONFIG_PREEMPT_RT)
 # define _WAIT_FOR_ATOMIC_CHECK(ATOMIC) WARN_ON_ONCE((ATOMIC) && !in_atomic())
 #else
 # define _WAIT_FOR_ATOMIC_CHECK(ATOMIC) do { } while (0)
@ drivers/gpu/drm/i915/intel_uncore_trace.h:10 @
 #if !defined(__INTEL_UNCORE_TRACE_H__) || defined(TRACE_HEADER_MULTI_READ)
 #define __INTEL_UNCORE_TRACE_H__
 
+#if defined(CONFIG_PREEMPT_RT) && !defined(NOTRACE)
+#define NOTRACE
+#endif
+
 #include "i915_reg_defs.h"
 
 #include <linux/types.h>
@ drivers/tty/serial/8250/8250_core.c:394 @ void __init serial8250_register_ports(struct uart_driver *drv, struct device *de
 
 #ifdef CONFIG_SERIAL_8250_CONSOLE
 
-static void univ8250_console_write(struct console *co, const char *s,
-				   unsigned int count)
+static void univ8250_console_write_atomic(struct console *co,
+					  struct nbcon_write_context *wctxt)
 {
 	struct uart_8250_port *up = &serial8250_ports[co->index];
 
-	serial8250_console_write(up, s, count);
+	serial8250_console_write(up, wctxt, true);
+}
+
+static void univ8250_console_write_thread(struct console *co,
+					  struct nbcon_write_context *wctxt)
+{
+	struct uart_8250_port *up = &serial8250_ports[co->index];
+
+	serial8250_console_write(up, wctxt, false);
+}
+
+static void univ8250_console_device_lock(struct console *co, unsigned long *flags)
+{
+	struct uart_port *up = &serial8250_ports[co->index].port;
+
+	__uart_port_lock_irqsave(up, flags);
+}
+
+static void univ8250_console_device_unlock(struct console *co, unsigned long flags)
+{
+	struct uart_port *up = &serial8250_ports[co->index].port;
+
+	__uart_port_unlock_irqrestore(up, flags);
 }
 
 static int univ8250_console_setup(struct console *co, char *options)
@ drivers/tty/serial/8250/8250_core.c:522 @ static int univ8250_console_match(struct console *co, char *name, int idx,
 
 static struct console univ8250_console = {
 	.name		= "ttyS",
-	.write		= univ8250_console_write,
+	.write_atomic	= univ8250_console_write_atomic,
+	.write_thread	= univ8250_console_write_thread,
+	.device_lock	= univ8250_console_device_lock,
+	.device_unlock	= univ8250_console_device_unlock,
 	.device		= uart_console_device,
 	.setup		= univ8250_console_setup,
 	.exit		= univ8250_console_exit,
 	.match		= univ8250_console_match,
-	.flags		= CON_PRINTBUFFER | CON_ANYTIME,
+	.flags		= CON_PRINTBUFFER | CON_ANYTIME | CON_NBCON,
 	.index		= -1,
 	.data		= &serial8250_reg,
 };
@ drivers/tty/serial/8250/8250_port.c:708 @ static void serial8250_clear_interrupts(struct uart_port *port)
 	serial_port_in(port, UART_MSR);
 }
 
-static void serial8250_clear_IER(struct uart_8250_port *up)
+/*
+ * Only to be used directly by the callback helper serial8250_console_write(),
+ * which may not require the port lock. Use serial8250_clear_IER() instead for
+ * all other cases.
+ */
+static void __serial8250_clear_IER(struct uart_8250_port *up)
 {
 	if (up->capabilities & UART_CAP_UUE)
 		serial_out(up, UART_IER, UART_IER_UUE);
@ drivers/tty/serial/8250/8250_port.c:721 @ static void serial8250_clear_IER(struct uart_8250_port *up)
 		serial_out(up, UART_IER, 0);
 }
 
+static inline void serial8250_clear_IER(struct uart_8250_port *up)
+{
+	/* Port locked to synchronize UART_IER access against the console */
+	lockdep_assert_held_once(&up->port.lock);
+
+	__serial8250_clear_IER(up);
+}
+
 /*
  * This is a quickie test to see how big the FIFO is.
  * It doesn't work at all the time, more's the pity.
@ drivers/tty/serial/8250/8250_port.c:1306 @ void serial8250_em485_stop_tx(struct uart_8250_port *p, bool toggle_ier)
 {
 	unsigned char mcr = serial8250_in_MCR(p);
 
-	/* Port locked to synchronize UART_IER access against the console. */
-	lockdep_assert_held_once(&p->port.lock);
-
 	if (p->port.rs485.flags & SER_RS485_RTS_AFTER_SEND)
 		mcr |= UART_MCR_RTS;
 	else
@ drivers/tty/serial/8250/8250_port.c:1321 @ void serial8250_em485_stop_tx(struct uart_8250_port *p, bool toggle_ier)
 		serial8250_clear_and_reinit_fifos(p);
 
 		if (toggle_ier) {
+			/*
+			 * Port locked to synchronize UART_IER access against
+			 * the console. The lockdep_assert must be restricted
+			 * to this condition because only here is it
+			 * guaranteed that the port lock is held. The other
+			 * hardware access in this function is synchronized
+			 * by console ownership.
+			 */
+			lockdep_assert_held_once(&p->port.lock);
+
 			p->ier |= UART_IER_RLSI | UART_IER_RDI;
 			serial_port_out(&p->port, UART_IER, p->ier);
 		}
@ drivers/tty/serial/8250/8250_port.c:3266 @ EXPORT_SYMBOL_GPL(serial8250_set_defaults);
 
 static void serial8250_console_putchar(struct uart_port *port, unsigned char ch)
 {
+	struct uart_8250_port *up = up_to_u8250p(port);
+
 	serial_port_out(port, UART_TX, ch);
+
+	up->console_line_ended = (ch == '\n');
 }
 
 static void serial8250_console_wait_putchar(struct uart_port *port, unsigned char ch)
@ drivers/tty/serial/8250/8250_port.c:3307 @ static void serial8250_console_restore(struct uart_8250_port *up)
 	serial8250_out_MCR(up, up->mcr | UART_MCR_DTR | UART_MCR_RTS);
 }
 
-static void fifo_wait_for_lsr(struct uart_8250_port *up, unsigned int count)
+static void fifo_wait_for_lsr(struct uart_8250_port *up,
+			      struct nbcon_write_context *wctxt,
+			      unsigned int count)
 {
 	unsigned int i;
 
 	for (i = 0; i < count; i++) {
+		/*
+		 * Pass the ownership as quickly as possible to a higher
+		 * priority context. Otherwise, its attempt to take over
+		 * the ownership might timeout. The new owner will wait
+		 * for UART_LSR_THRE before reusing the fifo.
+		 */
+		if (!nbcon_can_proceed(wctxt))
+			return;
+
 		if (wait_for_lsr(up, UART_LSR_THRE))
 			return;
 	}
@ drivers/tty/serial/8250/8250_port.c:3335 @ static void fifo_wait_for_lsr(struct uart_8250_port *up, unsigned int count)
  * to get empty.
  */
 static void serial8250_console_fifo_write(struct uart_8250_port *up,
-					  const char *s, unsigned int count)
+					  struct nbcon_write_context *wctxt)
 {
-	const char *end = s + count;
 	unsigned int fifosize = up->tx_loadsz;
 	struct uart_port *port = &up->port;
+	const char *s = wctxt->outbuf;
+	const char *end = s + wctxt->len;
 	unsigned int tx_count = 0;
 	bool cr_sent = false;
 	unsigned int i;
 
 	while (s != end) {
 		/* Allow timeout for each byte of a possibly full FIFO */
-		fifo_wait_for_lsr(up, fifosize);
+		fifo_wait_for_lsr(up, wctxt, fifosize);
 
+		/*
+		 * Fill the FIFO. If a handover or takeover occurs, writing
+		 * must be aborted since wctxt->outbuf and wctxt->len are no
+		 * longer valid.
+		 */
 		for (i = 0; i < fifosize && s != end; ++i) {
+			if (!nbcon_enter_unsafe(wctxt))
+				return;
+
 			if (*s == '\n' && !cr_sent) {
 				serial8250_console_putchar(port, '\r');
 				cr_sent = true;
@ drivers/tty/serial/8250/8250_port.c:3365 @ static void serial8250_console_fifo_write(struct uart_8250_port *up,
 				serial8250_console_putchar(port, *s++);
 				cr_sent = false;
 			}
+
+			nbcon_exit_unsafe(wctxt);
 		}
 		tx_count = i;
 	}
@ drivers/tty/serial/8250/8250_port.c:3375 @ static void serial8250_console_fifo_write(struct uart_8250_port *up,
 	 * Allow timeout for each byte written since the caller will only wait
 	 * for UART_LSR_BOTH_EMPTY using the timeout of a single character
 	 */
-	fifo_wait_for_lsr(up, tx_count);
+	fifo_wait_for_lsr(up, wctxt, tx_count);
+}
+
+static void serial8250_console_byte_write(struct uart_8250_port *up,
+					  struct nbcon_write_context *wctxt)
+{
+	struct uart_port *port = &up->port;
+	const char *s = wctxt->outbuf;
+	const char *end = s + wctxt->len;
+
+	/*
+	 * Write out the message. If a handover or takeover occurs, writing
+	 * must be aborted since wctxt->outbuf and wctxt->len are no longer
+	 * valid.
+	 */
+	while (s != end) {
+		if (!nbcon_enter_unsafe(wctxt))
+			return;
+
+		uart_console_write(port, s++, 1, serial8250_console_wait_putchar);
+
+		nbcon_exit_unsafe(wctxt);
+	}
 }
 
 /*
- *	Print a string to the serial port trying not to disturb
- *	any possible real use of the port...
+ * Print a string to the serial port trying not to disturb
+ * any possible real use of the port...
  *
- *	The console_lock must be held when we get here.
- *
- *	Doing runtime PM is really a bad idea for the kernel console.
- *	Thus, we assume the function is called when device is powered up.
+ * Doing runtime PM is really a bad idea for the kernel console.
+ * Thus, assume it is called when device is powered up.
  */
-void serial8250_console_write(struct uart_8250_port *up, const char *s,
-			      unsigned int count)
+void serial8250_console_write(struct uart_8250_port *up,
+			      struct nbcon_write_context *wctxt,
+			      bool is_atomic)
 {
 	struct uart_8250_em485 *em485 = up->em485;
 	struct uart_port *port = &up->port;
-	unsigned long flags;
-	unsigned int ier, use_fifo;
-	int locked = 1;
+	unsigned int ier;
+	bool use_fifo;
 
-	touch_nmi_watchdog();
-
-	if (oops_in_progress)
-		locked = uart_port_trylock_irqsave(port, &flags);
-	else
-		uart_port_lock_irqsave(port, &flags);
+	if (!nbcon_enter_unsafe(wctxt))
+		return;
 
 	/*
-	 *	First save the IER then disable the interrupts
+	 * First, save the IER, then disable the interrupts. The special
+	 * variant to clear the IER is used because console printing may
+	 * occur without holding the port lock.
 	 */
 	ier = serial_port_in(port, UART_IER);
-	serial8250_clear_IER(up);
+	__serial8250_clear_IER(up);
 
 	/* check scratch reg to see if port powered off during system sleep */
 	if (up->canary && (up->canary != serial_port_in(port, UART_SCR))) {
@ drivers/tty/serial/8250/8250_port.c:3439 @ void serial8250_console_write(struct uart_8250_port *up, const char *s,
 		mdelay(port->rs485.delay_rts_before_send);
 	}
 
+	/* If ownership was lost, no writing is allowed */
+	if (!nbcon_can_proceed(wctxt))
+		goto skip_write;
+
+	/*
+	 * If console printer did not fully output the previous line, it must
+	 * have been handed or taken over. Insert a newline in order to
+	 * maintain clean output.
+	 */
+	if (!up->console_line_ended)
+		uart_console_write(port, "\n", 1, serial8250_console_wait_putchar);
+
 	use_fifo = (up->capabilities & UART_CAP_FIFO) &&
 		/*
 		 * BCM283x requires to check the fifo
@ drivers/tty/serial/8250/8250_port.c:3471 @ void serial8250_console_write(struct uart_8250_port *up, const char *s,
 		 */
 		!(up->port.flags & UPF_CONS_FLOW);
 
+	nbcon_exit_unsafe(wctxt);
+
 	if (likely(use_fifo))
-		serial8250_console_fifo_write(up, s, count);
+		serial8250_console_fifo_write(up, wctxt);
 	else
-		uart_console_write(port, s, count, serial8250_console_wait_putchar);
+		serial8250_console_byte_write(up, wctxt);
+skip_write:
+	/*
+	 * If ownership was lost, this context must reacquire ownership and
+	 * re-enter the unsafe section in order to perform final actions
+	 * (such as re-enabling interrupts).
+	 */
+	if (!nbcon_can_proceed(wctxt)) {
+		do {
+			nbcon_reacquire_nobuf(wctxt);
+		} while (!nbcon_enter_unsafe(wctxt));
+	}
 
 	/*
 	 *	Finally, wait for transmitter to become empty
@ drivers/tty/serial/8250/8250_port.c:3510 @ void serial8250_console_write(struct uart_8250_port *up, const char *s,
 	 *	call it if we have saved something in the saved flags
 	 *	while processing with interrupts off.
 	 */
-	if (up->msr_saved_flags)
-		serial8250_modem_status(up);
+	if (up->msr_saved_flags) {
+		/*
+		 * For atomic, it must be deferred to irq_work because this
+		 * may be a context that does not permit waking up tasks.
+		 */
+		if (is_atomic)
+			irq_work_queue(&up->modem_status_work);
+		else
+			serial8250_modem_status(up);
+	}
 
-	if (locked)
-		uart_port_unlock_irqrestore(port, flags);
+	nbcon_exit_unsafe(wctxt);
 }
 
 static unsigned int probe_baud(struct uart_port *port)
@ drivers/tty/serial/8250/8250_port.c:3539 @ static unsigned int probe_baud(struct uart_port *port)
 	return (port->uartclk / 16) / quot;
 }
 
+/*
+ * irq_work handler to perform modem control. Only triggered via
+ * ->write_atomic() callback because it may be in a scheduler or
+ * NMI context, unable to wake tasks.
+ */
+static void modem_status_handler(struct irq_work *iwp)
+{
+	struct uart_8250_port *up = container_of(iwp, struct uart_8250_port, modem_status_work);
+	struct uart_port *port = &up->port;
+
+	uart_port_lock(port);
+	serial8250_modem_status(up);
+	uart_port_unlock(port);
+}
+
 int serial8250_console_setup(struct uart_port *port, char *options, bool probe)
 {
+	struct uart_8250_port *up = up_to_u8250p(port);
 	int baud = 9600;
 	int bits = 8;
 	int parity = 'n';
@ drivers/tty/serial/8250/8250_port.c:3566 @ int serial8250_console_setup(struct uart_port *port, char *options, bool probe)
 	if (!port->iobase && !port->membase)
 		return -ENODEV;
 
+	up->console_line_ended = true;
+	init_irq_work(&up->modem_status_work, modem_status_handler);
+
 	if (options)
 		uart_parse_options(options, &baud, &parity, &bits, &flow);
 	else if (probe)
@ include/linux/serial_8250.h:153 @ struct uart_8250_port {
 #define LSR_SAVE_FLAGS UART_LSR_BRK_ERROR_BITS
 	u16			lsr_saved_flags;
 	u16			lsr_save_mask;
+
+	/*
+	 * Track when a console line has been fully written to the
+	 * hardware, i.e. true when the most recent byte written to
+	 * UART_TX by the console was '\n'.
+	 */
+	bool			console_line_ended;
+
 #define MSR_SAVE_FLAGS UART_MSR_ANY_DELTA
 	unsigned char		msr_saved_flags;
+	struct irq_work		modem_status_work;
 
 	struct uart_8250_dma	*dma;
 	const struct uart_8250_ops *ops;
@ include/linux/serial_8250.h:214 @ void serial8250_tx_chars(struct uart_8250_port *up);
 unsigned int serial8250_modem_status(struct uart_8250_port *up);
 void serial8250_init_port(struct uart_8250_port *up);
 void serial8250_set_defaults(struct uart_8250_port *up);
-void serial8250_console_write(struct uart_8250_port *up, const char *s,
-			      unsigned int count);
+void serial8250_console_write(struct uart_8250_port *up,
+			      struct nbcon_write_context *wctxt, bool in_atomic);
 int serial8250_console_setup(struct uart_port *port, char *options, bool probe);
 int serial8250_console_exit(struct uart_port *port);
 
@ kernel/Kconfig.preempt:106 @ config PREEMPT_RT
 	  Select this if you are building a kernel for systems which
 	  require real-time guarantees.
 
+config PREEMPT_RT_NEEDS_BH_LOCK
+	bool "Enforce softirq synchronisation on PREEMPT_RT"
+	depends on PREEMPT_RT
+	help
+	  Enforce synchronisation across the softirqs context. On PREEMPT_RT
+	  the softirq is preemptible. This enforces the same per-CPU BLK
+	  semantic non-PREEMPT_RT builds have. This should not be needed
+	  because per-CPU locks were added to avoid the per-CPU BKL.
+
+	  This switch provides the old behaviour for testing reasons. Select
+	  this if you suspect an error with preemptible softirq and want test
+	  the old synchronized behaviour.
+
 config PREEMPT_COUNT
        bool
 
@ kernel/ksysfs.c:191 @ KERNEL_ATTR_RO(crash_elfcorehdr_size);
 
 #endif /* CONFIG_VMCORE_INFO */
 
+#if defined(CONFIG_PREEMPT_RT)
+static ssize_t realtime_show(struct kobject *kobj,
+			     struct kobj_attribute *attr, char *buf)
+{
+	return sprintf(buf, "%d\n", 1);
+}
+KERNEL_ATTR_RO(realtime);
+#endif
+
 /* whether file capabilities are enabled */
 static ssize_t fscaps_show(struct kobject *kobj,
 				  struct kobj_attribute *attr, char *buf)
@ kernel/ksysfs.c:283 @ static struct attribute * kernel_attrs[] = {
 #ifndef CONFIG_TINY_RCU
 	&rcu_expedited_attr.attr,
 	&rcu_normal_attr.attr,
+#endif
+#ifdef CONFIG_PREEMPT_RT
+	&realtime_attr.attr,
 #endif
 	NULL
 };
@ kernel/softirq.c:168 @ void __local_bh_disable_ip(unsigned long ip, unsigned int cnt)
 	/* First entry of a task into a BH disabled section? */
 	if (!current->softirq_disable_cnt) {
 		if (preemptible()) {
-			local_lock(&softirq_ctrl.lock);
+			if (IS_ENABLED(CONFIG_PREEMPT_RT_NEEDS_BH_LOCK))
+				local_lock(&softirq_ctrl.lock);
+			else
+				migrate_disable();
+
 			/* Required to meet the RCU bottomhalf requirements. */
 			rcu_read_lock();
 		} else {
@ kernel/softirq.c:184 @ void __local_bh_disable_ip(unsigned long ip, unsigned int cnt)
 	 * Track the per CPU softirq disabled state. On RT this is per CPU
 	 * state to allow preemption of bottom half disabled sections.
 	 */
-	newcnt = __this_cpu_add_return(softirq_ctrl.cnt, cnt);
-	/*
-	 * Reflect the result in the task state to prevent recursion on the
-	 * local lock and to make softirq_count() & al work.
-	 */
-	current->softirq_disable_cnt = newcnt;
+	if (IS_ENABLED(CONFIG_PREEMPT_RT_NEEDS_BH_LOCK)) {
+		newcnt = this_cpu_add_return(softirq_ctrl.cnt, cnt);
+		/*
+		 * Reflect the result in the task state to prevent recursion on the
+		 * local lock and to make softirq_count() & al work.
+		 */
+		current->softirq_disable_cnt = newcnt;
 
-	if (IS_ENABLED(CONFIG_TRACE_IRQFLAGS) && newcnt == cnt) {
-		raw_local_irq_save(flags);
-		lockdep_softirqs_off(ip);
-		raw_local_irq_restore(flags);
+		if (IS_ENABLED(CONFIG_TRACE_IRQFLAGS) && newcnt == cnt) {
+			raw_local_irq_save(flags);
+			lockdep_softirqs_off(ip);
+			raw_local_irq_restore(flags);
+		}
+	} else {
+		bool sirq_dis = false;
+
+		if (!current->softirq_disable_cnt)
+			sirq_dis = true;
+
+		this_cpu_add(softirq_ctrl.cnt, cnt);
+		current->softirq_disable_cnt += cnt;
+		WARN_ON_ONCE(current->softirq_disable_cnt < 0);
+
+		if (IS_ENABLED(CONFIG_TRACE_IRQFLAGS) && sirq_dis) {
+			raw_local_irq_save(flags);
+			lockdep_softirqs_off(ip);
+			raw_local_irq_restore(flags);
+		}
 	}
 }
 EXPORT_SYMBOL(__local_bh_disable_ip);
@ kernel/softirq.c:219 @ EXPORT_SYMBOL(__local_bh_disable_ip);
 static void __local_bh_enable(unsigned int cnt, bool unlock)
 {
 	unsigned long flags;
+	bool sirq_en = false;
 	int newcnt;
 
-	DEBUG_LOCKS_WARN_ON(current->softirq_disable_cnt !=
-			    this_cpu_read(softirq_ctrl.cnt));
+	if (IS_ENABLED(CONFIG_PREEMPT_RT_NEEDS_BH_LOCK)) {
+		DEBUG_LOCKS_WARN_ON(current->softirq_disable_cnt !=
+				    this_cpu_read(softirq_ctrl.cnt));
+		if (softirq_count() == cnt)
+			sirq_en = true;
+	} else {
+		if (current->softirq_disable_cnt == cnt)
+			sirq_en = true;
+	}
 
-	if (IS_ENABLED(CONFIG_TRACE_IRQFLAGS) && softirq_count() == cnt) {
+	if (IS_ENABLED(CONFIG_TRACE_IRQFLAGS) && sirq_en) {
 		raw_local_irq_save(flags);
 		lockdep_softirqs_on(_RET_IP_);
 		raw_local_irq_restore(flags);
 	}
 
-	newcnt = __this_cpu_sub_return(softirq_ctrl.cnt, cnt);
-	current->softirq_disable_cnt = newcnt;
+	if (IS_ENABLED(CONFIG_PREEMPT_RT_NEEDS_BH_LOCK)) {
+		newcnt = this_cpu_sub_return(softirq_ctrl.cnt, cnt);
+		current->softirq_disable_cnt = newcnt;
 
-	if (!newcnt && unlock) {
-		rcu_read_unlock();
-		local_unlock(&softirq_ctrl.lock);
+		if (!newcnt && unlock) {
+			rcu_read_unlock();
+			local_unlock(&softirq_ctrl.lock);
+		}
+	} else {
+		current->softirq_disable_cnt -= cnt;
+		this_cpu_sub(softirq_ctrl.cnt, cnt);
+		if (unlock && !current->softirq_disable_cnt) {
+			migrate_enable();
+			rcu_read_unlock();
+		} else {
+			WARN_ON_ONCE(current->softirq_disable_cnt < 0);
+		}
 	}
 }
 
@ kernel/softirq.c:271 @ void __local_bh_enable_ip(unsigned long ip, unsigned int cnt)
 	lock_map_release(&bh_lock_map);
 
 	local_irq_save(flags);
-	curcnt = __this_cpu_read(softirq_ctrl.cnt);
+	if (IS_ENABLED(CONFIG_PREEMPT_RT_NEEDS_BH_LOCK))
+		curcnt = this_cpu_read(softirq_ctrl.cnt);
+	else
+		curcnt = current->softirq_disable_cnt;
 
 	/*
 	 * If this is not reenabling soft interrupts, no point in trying to
@ kernel/softirq.c:851 @ static bool tasklet_clear_sched(struct tasklet_struct *t)
 	return false;
 }
 
+#ifdef CONFIG_PREEMPT_RT
+struct tasklet_sync_callback {
+	spinlock_t	cb_lock;
+	atomic_t	cb_waiters;
+};
+
+static DEFINE_PER_CPU(struct tasklet_sync_callback, tasklet_sync_callback) = {
+	.cb_lock	= __SPIN_LOCK_UNLOCKED(tasklet_sync_callback.cb_lock),
+	.cb_waiters	= ATOMIC_INIT(0),
+};
+
+static void tasklet_lock_callback(void)
+{
+	spin_lock(this_cpu_ptr(&tasklet_sync_callback.cb_lock));
+}
+
+static void tasklet_unlock_callback(void)
+{
+	spin_unlock(this_cpu_ptr(&tasklet_sync_callback.cb_lock));
+}
+
+static void tasklet_callback_cancel_wait_running(void)
+{
+	struct tasklet_sync_callback *sync_cb = this_cpu_ptr(&tasklet_sync_callback);
+
+	atomic_inc(&sync_cb->cb_waiters);
+	spin_lock(&sync_cb->cb_lock);
+	atomic_dec(&sync_cb->cb_waiters);
+	spin_unlock(&sync_cb->cb_lock);
+}
+
+static void tasklet_callback_sync_wait_running(void)
+{
+	struct tasklet_sync_callback *sync_cb = this_cpu_ptr(&tasklet_sync_callback);
+
+	if (atomic_read(&sync_cb->cb_waiters)) {
+		spin_unlock(&sync_cb->cb_lock);
+		spin_lock(&sync_cb->cb_lock);
+	}
+}
+
+#else /* !CONFIG_PREEMPT_RT: */
+
+static void tasklet_lock_callback(void) { }
+static void tasklet_unlock_callback(void) { }
+static void tasklet_callback_sync_wait_running(void) { }
+
+#ifdef CONFIG_SMP
+static void tasklet_callback_cancel_wait_running(void) { }
+#endif
+#endif /* !CONFIG_PREEMPT_RT */
+
 static void tasklet_action_common(struct tasklet_head *tl_head,
 				  unsigned int softirq_nr)
 {
@ kernel/softirq.c:914 @ static void tasklet_action_common(struct tasklet_head *tl_head,
 	tl_head->tail = &tl_head->head;
 	local_irq_enable();
 
+	tasklet_lock_callback();
 	while (list) {
 		struct tasklet_struct *t = list;
 
@ kernel/softirq.c:934 @ static void tasklet_action_common(struct tasklet_head *tl_head,
 					}
 				}
 				tasklet_unlock(t);
+				tasklet_callback_sync_wait_running();
 				continue;
 			}
 			tasklet_unlock(t);
@ kernel/softirq.c:947 @ static void tasklet_action_common(struct tasklet_head *tl_head,
 		__raise_softirq_irqoff(softirq_nr);
 		local_irq_enable();
 	}
+	tasklet_unlock_callback();
 }
 
 static __latent_entropy void tasklet_action(void)
@ kernel/softirq.c:998 @ void tasklet_unlock_spin_wait(struct tasklet_struct *t)
 			/*
 			 * Prevent a live lock when current preempted soft
 			 * interrupt processing or prevents ksoftirqd from
-			 * running. If the tasklet runs on a different CPU
-			 * then this has no effect other than doing the BH
-			 * disable/enable dance for nothing.
+			 * running.
 			 */
-			local_bh_disable();
-			local_bh_enable();
+			tasklet_callback_cancel_wait_running();
 		} else {
 			cpu_relax();
 		}
@ kernel/workqueue.c:225 @ struct worker_pool {
 	struct workqueue_attrs	*attrs;		/* I: worker attributes */
 	struct hlist_node	hash_node;	/* PL: unbound_pool_hash node */
 	int			refcnt;		/* PL: refcnt for unbound pools */
-
+#ifdef CONFIG_PREEMPT_RT
+	spinlock_t		cb_lock;	/* BH worker cancel lock */
+#endif
 	/*
 	 * Destruction of pool is RCU protected to allow dereferences
 	 * from get_work_pool().
@ kernel/workqueue.c:3083 @ __acquires(&pool->lock)
 		goto restart;
 }
 
+#ifdef CONFIG_PREEMPT_RT
+static void worker_lock_callback(struct worker_pool *pool)
+{
+	spin_lock(&pool->cb_lock);
+}
+
+static void worker_unlock_callback(struct worker_pool *pool)
+{
+	spin_unlock(&pool->cb_lock);
+}
+
+static void workqueue_callback_cancel_wait_running(struct worker_pool *pool)
+{
+	spin_lock(&pool->cb_lock);
+	spin_unlock(&pool->cb_lock);
+}
+
+#else
+
+static void worker_lock_callback(struct worker_pool *pool) { }
+static void worker_unlock_callback(struct worker_pool *pool) { }
+static void workqueue_callback_cancel_wait_running(struct worker_pool *pool) { }
+
+#endif
+
 /**
  * manage_workers - manage worker pool
  * @worker: self
@ kernel/workqueue.c:3587 @ static void bh_worker(struct worker *worker)
 	int nr_restarts = BH_WORKER_RESTARTS;
 	unsigned long end = jiffies + BH_WORKER_JIFFIES;
 
+	worker_lock_callback(pool);
 	raw_spin_lock_irq(&pool->lock);
 	worker_leave_idle(worker);
 
@ kernel/workqueue.c:3616 @ static void bh_worker(struct worker *worker)
 	worker_enter_idle(worker);
 	kick_pool(pool);
 	raw_spin_unlock_irq(&pool->lock);
+	worker_unlock_callback(pool);
 }
 
 /*
@ kernel/workqueue.c:4254 @ static bool __flush_work(struct work_struct *work, bool from_cancel)
 		    (data & WORK_OFFQ_BH)) {
 			/*
 			 * On RT, prevent a live lock when %current preempted
-			 * soft interrupt processing or prevents ksoftirqd from
-			 * running by keeping flipping BH. If the BH work item
-			 * runs on a different CPU then this has no effect other
-			 * than doing the BH disable/enable dance for nothing.
-			 * This is copied from
-			 * kernel/softirq.c::tasklet_unlock_spin_wait().
+			 * soft interrupt processing by blocking on lock which
+			 * is owned by the thread invoking the callback.
 			 */
 			while (!try_wait_for_completion(&barr.done)) {
 				if (IS_ENABLED(CONFIG_PREEMPT_RT)) {
-					local_bh_disable();
-					local_bh_enable();
+					struct worker_pool *pool;
+
+					guard(rcu)();
+					pool = get_work_pool(work);
+					if (pool)
+						workqueue_callback_cancel_wait_running(pool);
 				} else {
 					cpu_relax();
 				}
@ kernel/workqueue.c:4814 @ static int init_worker_pool(struct worker_pool *pool)
 	ida_init(&pool->worker_ida);
 	INIT_HLIST_NODE(&pool->hash_node);
 	pool->refcnt = 1;
+#ifdef CONFIG_PREEMPT_RT
+	spin_lock_init(&pool->cb_lock);
+#endif
 
 	/* shouldn't fail above this point */
 	pool->attrs = alloc_workqueue_attrs();
@ localversion-rt:1 @
+-rt5
@ net/core/gro_cells.c:11 @
 struct gro_cell {
 	struct sk_buff_head	napi_skbs;
 	struct napi_struct	napi;
+	local_lock_t		bh_lock;
 };
 
 int gro_cells_receive(struct gro_cells *gcells, struct sk_buff *skb)
 {
 	struct net_device *dev = skb->dev;
+	bool have_bh_lock = false;
 	struct gro_cell *cell;
 	int res;
 
@ net/core/gro_cells.c:30 @ int gro_cells_receive(struct gro_cells *gcells, struct sk_buff *skb)
 		goto unlock;
 	}
 
+	local_lock_nested_bh(&gcells->cells->bh_lock);
+	have_bh_lock = true;
 	cell = this_cpu_ptr(gcells->cells);
 
 	if (skb_queue_len(&cell->napi_skbs) > READ_ONCE(net_hotdata.max_backlog)) {
@ net/core/gro_cells.c:46 @ int gro_cells_receive(struct gro_cells *gcells, struct sk_buff *skb)
 	if (skb_queue_len(&cell->napi_skbs) == 1)
 		napi_schedule(&cell->napi);
 
+	if (have_bh_lock)
+		local_unlock_nested_bh(&gcells->cells->bh_lock);
+
 	res = NET_RX_SUCCESS;
 
 unlock:
@ net/core/gro_cells.c:64 @ static int gro_cell_poll(struct napi_struct *napi, int budget)
 	struct sk_buff *skb;
 	int work_done = 0;
 
+	__local_lock_nested_bh(&cell->bh_lock);
 	while (work_done < budget) {
 		skb = __skb_dequeue(&cell->napi_skbs);
 		if (!skb)
@ net/core/gro_cells.c:75 @ static int gro_cell_poll(struct napi_struct *napi, int budget)
 
 	if (work_done < budget)
 		napi_complete_done(napi, work_done);
+	__local_unlock_nested_bh(&cell->bh_lock);
 	return work_done;
 }
 
@ net/core/gro_cells.c:91 @ int gro_cells_init(struct gro_cells *gcells, struct net_device *dev)
 		struct gro_cell *cell = per_cpu_ptr(gcells->cells, i);
 
 		__skb_queue_head_init(&cell->napi_skbs);
+		local_lock_init(&cell->bh_lock);
 
 		set_bit(NAPI_STATE_NO_BUSY_POLL, &cell->napi.state);
 
@ net/netfilter/nf_conntrack_netlink.c:63 @ MODULE_LICENSE("GPL");
 MODULE_DESCRIPTION("List and change connection tracking table");
 
 struct ctnetlink_list_dump_ctx {
-	struct nf_conn *last;
+	unsigned long last_id;
 	unsigned int cpu;
 	bool done;
 };
@ net/netfilter/nf_conntrack_netlink.c:1736 @ static int ctnetlink_get_conntrack(struct sk_buff *skb,
 	return nfnetlink_unicast(skb2, info->net, NETLINK_CB(skb).portid);
 }
 
-static int ctnetlink_done_list(struct netlink_callback *cb)
-{
-	struct ctnetlink_list_dump_ctx *ctx = (void *)cb->ctx;
-
-	if (ctx->last)
-		nf_ct_put(ctx->last);
-
-	return 0;
-}
-
 #ifdef CONFIG_NF_CONNTRACK_EVENTS
 static int ctnetlink_dump_one_entry(struct sk_buff *skb,
 				    struct netlink_callback *cb,
@ net/netfilter/nf_conntrack_netlink.c:1750 @ static int ctnetlink_dump_one_entry(struct sk_buff *skb,
 	if (l3proto && nf_ct_l3num(ct) != l3proto)
 		return 0;
 
-	if (ctx->last) {
-		if (ct != ctx->last)
+	if (ctx->last_id) {
+		if (ctnetlink_get_id(ct) != ctx->last_id)
 			return 0;
 
-		ctx->last = NULL;
+		ctx->last_id = 0;
 	}
 
 	/* We can't dump extension info for the unconfirmed
@ net/netfilter/nf_conntrack_netlink.c:1768 @ static int ctnetlink_dump_one_entry(struct sk_buff *skb,
 				  cb->nlh->nlmsg_seq,
 				  NFNL_MSG_TYPE(cb->nlh->nlmsg_type),
 				  ct, dying, 0);
-	if (res < 0) {
-		if (!refcount_inc_not_zero(&ct->ct_general.use))
-			return 0;
-
-		ctx->last = ct;
-	}
+	if (res < 0)
+		ctx->last_id = ctnetlink_get_id(ct);
 
 	return res;
 }
@ net/netfilter/nf_conntrack_netlink.c:1785 @ static int
 ctnetlink_dump_dying(struct sk_buff *skb, struct netlink_callback *cb)
 {
 	struct ctnetlink_list_dump_ctx *ctx = (void *)cb->ctx;
-	struct nf_conn *last = ctx->last;
 #ifdef CONFIG_NF_CONNTRACK_EVENTS
 	const struct net *net = sock_net(skb->sk);
 	struct nf_conntrack_net_ecache *ecache_net;
+	unsigned long last_id = ctx->last_id;
 	struct nf_conntrack_tuple_hash *h;
 	struct hlist_nulls_node *n;
 #endif
@ net/netfilter/nf_conntrack_netlink.c:1796 @ ctnetlink_dump_dying(struct sk_buff *skb, struct netlink_callback *cb)
 	if (ctx->done)
 		return 0;
 
-	ctx->last = NULL;
+	ctx->last_id = 0;
 
 #ifdef CONFIG_NF_CONNTRACK_EVENTS
 	ecache_net = nf_conn_pernet_ecache(net);
@ net/netfilter/nf_conntrack_netlink.c:1807 @ ctnetlink_dump_dying(struct sk_buff *skb, struct netlink_callback *cb)
 		int res;
 
 		ct = nf_ct_tuplehash_to_ctrack(h);
-		if (last && last != ct)
+		if (last_id && last_id != ctnetlink_get_id(ct))
 			continue;
 
 		res = ctnetlink_dump_one_entry(skb, cb, ct, true);
 		if (res < 0) {
 			spin_unlock_bh(&ecache_net->dying_lock);
-			nf_ct_put(last);
 			return skb->len;
 		}
 
-		nf_ct_put(last);
-		last = NULL;
+		last_id = 0;
 	}
 
 	spin_unlock_bh(&ecache_net->dying_lock);
 #endif
 	ctx->done = true;
-	nf_ct_put(last);
 
 	return skb->len;
 }
@ net/netfilter/nf_conntrack_netlink.c:1833 @ static int ctnetlink_get_ct_dying(struct sk_buff *skb,
 	if (info->nlh->nlmsg_flags & NLM_F_DUMP) {
 		struct netlink_dump_control c = {
 			.dump = ctnetlink_dump_dying,
-			.done = ctnetlink_done_list,
 		};
 		return netlink_dump_start(info->sk, skb, info->nlh, &c);
 	}
@ net/netfilter/nf_conntrack_netlink.c:1847 @ static int ctnetlink_get_ct_unconfirmed(struct sk_buff *skb,
 	if (info->nlh->nlmsg_flags & NLM_F_DUMP) {
 		struct netlink_dump_control c = {
 			.dump = ctnetlink_dump_unconfirmed,
-			.done = ctnetlink_done_list,
 		};
 		return netlink_dump_start(info->sk, skb, info->nlh, &c);
 	}
@ net/netfilter/nft_set_pipapo.c:400 @ int pipapo_refill(unsigned long *map, unsigned int len, unsigned int rules,
 }
 
 /**
- * pipapo_get() - Get matching element reference given key data
+ * pipapo_get_slow() - Get matching element reference given key data
  * @m:		storage containing the set elements
  * @data:	Key data to be matched against existing elements
  * @genmask:	If set, check that element is active in given genmask
@ net/netfilter/nft_set_pipapo.c:417 @ int pipapo_refill(unsigned long *map, unsigned int len, unsigned int rules,
  *
  * Return: pointer to &struct nft_pipapo_elem on match, NULL otherwise.
  */
-static struct nft_pipapo_elem *pipapo_get(const struct nft_pipapo_match *m,
-					  const u8 *data, u8 genmask,
-					  u64 tstamp)
+static struct nft_pipapo_elem *pipapo_get_slow(const struct nft_pipapo_match *m,
+					       const u8 *data, u8 genmask,
+					       u64 tstamp)
 {
+	unsigned long *res_map, *fill_map, *map;
 	struct nft_pipapo_scratch *scratch;
-	unsigned long *res_map, *fill_map;
 	const struct nft_pipapo_field *f;
 	bool map_index;
 	int i;
@ net/netfilter/nft_set_pipapo.c:432 @ static struct nft_pipapo_elem *pipapo_get(const struct nft_pipapo_match *m,
 	scratch = *raw_cpu_ptr(m->scratch);
 	if (unlikely(!scratch))
 		goto out;
+	__local_lock_nested_bh(&scratch->bh_lock);
 
 	map_index = scratch->map_index;
 
-	res_map  = scratch->map + (map_index ? m->bsize_max : 0);
-	fill_map = scratch->map + (map_index ? 0 : m->bsize_max);
+	map = NFT_PIPAPO_LT_ALIGN(&scratch->__map[0]);
+	res_map  = map + (map_index ? m->bsize_max : 0);
+	fill_map = map + (map_index ? 0 : m->bsize_max);
 
 	pipapo_resmap_init(m, res_map);
 
@ net/netfilter/nft_set_pipapo.c:469 @ static struct nft_pipapo_elem *pipapo_get(const struct nft_pipapo_match *m,
 				  last);
 		if (b < 0) {
 			scratch->map_index = map_index;
+			__local_unlock_nested_bh(&scratch->bh_lock);
 			local_bh_enable();
 
 			return NULL;
@ net/netfilter/nft_set_pipapo.c:489 @ static struct nft_pipapo_elem *pipapo_get(const struct nft_pipapo_match *m,
 			 * *next* bitmap (not initial) for the next packet.
 			 */
 			scratch->map_index = map_index;
+			__local_unlock_nested_bh(&scratch->bh_lock);
 			local_bh_enable();
 			return e;
 		}
@ net/netfilter/nft_set_pipapo.c:504 @ static struct nft_pipapo_elem *pipapo_get(const struct nft_pipapo_match *m,
 		data += NFT_PIPAPO_GROUPS_PADDING(f);
 	}
 
+	__local_unlock_nested_bh(&scratch->bh_lock);
 out:
 	local_bh_enable();
 	return NULL;
 }
 
+/**
+ * pipapo_get() - Get matching element reference given key data
+ * @m:		Storage containing the set elements
+ * @data:	Key data to be matched against existing elements
+ * @genmask:	If set, check that element is active in given genmask
+ * @tstamp:	Timestamp to check for expired elements
+ *
+ * This is a dispatcher function, either calling out the generic C
+ * implementation or, if available, the AVX2 one.
+ * This helper is only called from the control plane, with either RCU
+ * read lock or transaction mutex held.
+ *
+ * Return: pointer to &struct nft_pipapo_elem on match, NULL otherwise.
+ */
+static struct nft_pipapo_elem *pipapo_get(const struct nft_pipapo_match *m,
+					  const u8 *data, u8 genmask,
+					  u64 tstamp)
+{
+	struct nft_pipapo_elem *e;
+
+	local_bh_disable();
+
+#if defined(CONFIG_X86_64) && !defined(CONFIG_UML)
+	if (boot_cpu_has(X86_FEATURE_AVX2) && boot_cpu_has(X86_FEATURE_AVX) &&
+	    irq_fpu_usable()) {
+		e = pipapo_get_avx2(m, data, genmask, tstamp);
+		local_bh_enable();
+		return e;
+	}
+#endif
+	e = pipapo_get_slow(m, data, genmask, tstamp);
+	local_bh_enable();
+	return e;
+}
+
 /**
  * nft_pipapo_lookup() - Dataplane fronted for main lookup function
  * @net:	Network namespace
@ net/netfilter/nft_set_pipapo.c:553 @ static struct nft_pipapo_elem *pipapo_get(const struct nft_pipapo_match *m,
  *
  * This function is called from the data path.  It will search for
  * an element matching the given key in the current active copy.
- * Unlike other set types, this uses NFT_GENMASK_ANY instead of
- * nft_genmask_cur().
+ * Unlike other set types, this uses 0 instead of nft_genmask_cur().
  *
  * This is because new (future) elements are not reachable from
  * priv->match, they get added to priv->clone instead.
@ net/netfilter/nft_set_pipapo.c:563 @ static struct nft_pipapo_elem *pipapo_get(const struct nft_pipapo_match *m,
  * inconsistent state: matching old entries get skipped but thew
  * newly matching entries are unreachable.
  *
- * GENMASK will still find the 'now old' entries which ensures consistent
- * priv->match view.
+ * GENMASK_ANY doesn't work for the same reason: old-gen entries get
+ * skipped, new-gen entries are only reachable from priv->clone.
  *
  * nft_pipapo_commit swaps ->clone and ->match shortly after the
  * genbit flip.  As ->clone doesn't contain the old entries in the first
@ net/netfilter/nft_set_pipapo.c:581 @ nft_pipapo_lookup(const struct net *net, const struct nft_set *set,
 	const struct nft_pipapo_elem *e;
 
 	m = rcu_dereference(priv->match);
-	e = pipapo_get(m, (const u8 *)key, NFT_GENMASK_ANY, get_jiffies_64());
+	e = pipapo_get_slow(m, (const u8 *)key, 0, get_jiffies_64());
 
 	return e ? &e->ext : NULL;
 }
@ net/netfilter/nft_set_pipapo.c:1194 @ static void pipapo_map(struct nft_pipapo_match *m,
 }
 
 /**
- * pipapo_free_scratch() - Free per-CPU map at original (not aligned) address
+ * pipapo_free_scratch() - Free per-CPU map at original address
  * @m:		Matching data
  * @cpu:	CPU number
  */
 static void pipapo_free_scratch(const struct nft_pipapo_match *m, unsigned int cpu)
 {
 	struct nft_pipapo_scratch *s;
-	void *mem;
 
 	s = *per_cpu_ptr(m->scratch, cpu);
-	if (!s)
-		return;
 
-	mem = s;
-	mem -= s->align_off;
-	kvfree(mem);
+	kvfree(s);
 }
 
 /**
@ net/netfilter/nft_set_pipapo.c:1221 @ static int pipapo_realloc_scratch(struct nft_pipapo_match *clone,
 
 	for_each_possible_cpu(i) {
 		struct nft_pipapo_scratch *scratch;
-#ifdef NFT_PIPAPO_ALIGN
-		void *scratch_aligned;
-		u32 align_off;
-#endif
-		scratch = kvzalloc_node(struct_size(scratch, map, bsize_max * 2) +
+
+		scratch = kvzalloc_node(struct_size(scratch, __map, bsize_max * 2) +
 					NFT_PIPAPO_ALIGN_HEADROOM,
 					GFP_KERNEL_ACCOUNT, cpu_to_node(i));
 		if (!scratch) {
@ net/netfilter/nft_set_pipapo.c:1237 @ static int pipapo_realloc_scratch(struct nft_pipapo_match *clone,
 		}
 
 		pipapo_free_scratch(clone, i);
-
-#ifdef NFT_PIPAPO_ALIGN
-		/* Align &scratch->map (not the struct itself): the extra
-		 * %NFT_PIPAPO_ALIGN_HEADROOM bytes passed to kzalloc_node()
-		 * above guarantee we can waste up to those bytes in order
-		 * to align the map field regardless of its offset within
-		 * the struct.
-		 */
-		BUILD_BUG_ON(offsetof(struct nft_pipapo_scratch, map) > NFT_PIPAPO_ALIGN_HEADROOM);
-
-		scratch_aligned = NFT_PIPAPO_LT_ALIGN(&scratch->map);
-		scratch_aligned -= offsetof(struct nft_pipapo_scratch, map);
-		align_off = scratch_aligned - (void *)scratch;
-
-		scratch = scratch_aligned;
-		scratch->align_off = align_off;
-#endif
+		local_lock_init(&scratch->bh_lock);
 		*per_cpu_ptr(clone->scratch, i) = scratch;
 	}
 
@ net/netfilter/nft_set_pipapo.h:127 @ struct nft_pipapo_field {
 
 /**
  * struct nft_pipapo_scratch - percpu data used for lookup and matching
+ * @bh_lock:    PREEMPT_RT local spinlock
  * @map_index:	Current working bitmap index, toggled between field matches
- * @align_off:	Offset to get the originally allocated address
- * @map:	store partial matching results during lookup
+ * @__map:	store partial matching results during lookup
  */
 struct nft_pipapo_scratch {
+	local_lock_t bh_lock;
 	u8 map_index;
-	u32 align_off;
-	unsigned long map[];
+	unsigned long __map[];
 };
 
 /**
@ net/netfilter/nft_set_pipapo_avx2.c:1136 @ static inline void pipapo_resmap_init_avx2(const struct nft_pipapo_match *m, uns
 }
 
 /**
- * nft_pipapo_avx2_lookup() - Lookup function for AVX2 implementation
- * @net:	Network namespace
- * @set:	nftables API set representation
- * @key:	nftables API element representation containing key data
+ * pipapo_get_avx2() - Lookup function for AVX2 implementation
+ * @m:		Storage containing the set elements
+ * @data:	Key data to be matched against existing elements
+ * @genmask:	If set, check that element is active in given genmask
+ * @tstamp:	Timestamp to check for expired elements
  *
  * For more details, see DOC: Theory of Operation in nft_set_pipapo.c.
  *
  * This implementation exploits the repetitive characteristic of the algorithm
  * to provide a fast, vectorised version using the AVX2 SIMD instruction set.
  *
- * Return: true on match, false otherwise.
+ * The caller must check that the FPU is usable.
+ * This function must be called with BH disabled.
+ *
+ * Return: pointer to &struct nft_pipapo_elem on match, NULL otherwise.
  */
-const struct nft_set_ext *
-nft_pipapo_avx2_lookup(const struct net *net, const struct nft_set *set,
-		       const u32 *key)
+struct nft_pipapo_elem *pipapo_get_avx2(const struct nft_pipapo_match *m,
+					const u8 *data, u8 genmask,
+					u64 tstamp)
 {
-	struct nft_pipapo *priv = nft_set_priv(set);
-	const struct nft_set_ext *ext = NULL;
 	struct nft_pipapo_scratch *scratch;
-	const struct nft_pipapo_match *m;
 	const struct nft_pipapo_field *f;
-	const u8 *rp = (const u8 *)key;
-	unsigned long *res, *fill;
+	unsigned long *res, *fill, *map;
 	bool map_index;
 	int i;
 
-	local_bh_disable();
+	scratch = *raw_cpu_ptr(m->scratch);
+	if (unlikely(!scratch))
+		return NULL;
 
-	if (unlikely(!irq_fpu_usable())) {
-		ext = nft_pipapo_lookup(net, set, key);
+	__local_lock_nested_bh(&scratch->bh_lock);
+	map_index = scratch->map_index;
+	map = NFT_PIPAPO_LT_ALIGN(&scratch->__map[0]);
+	res  = map + (map_index ? m->bsize_max : 0);
+	fill = map + (map_index ? 0 : m->bsize_max);
 
-		local_bh_enable();
-		return ext;
-	}
+	pipapo_resmap_init_avx2(m, res);
 
-	m = rcu_dereference(priv->match);
-
-	/* This also protects access to all data related to scratch maps.
-	 *
-	 * Note that we don't need a valid MXCSR state for any of the
+	/* Note that we don't need a valid MXCSR state for any of the
 	 * operations we use here, so pass 0 as mask and spare a LDMXCSR
 	 * instruction.
 	 */
 	kernel_fpu_begin_mask(0);
 
-	scratch = *raw_cpu_ptr(m->scratch);
-	if (unlikely(!scratch)) {
-		kernel_fpu_end();
-		local_bh_enable();
-		return NULL;
-	}
-
-	map_index = scratch->map_index;
-
-	res  = scratch->map + (map_index ? m->bsize_max : 0);
-	fill = scratch->map + (map_index ? 0 : m->bsize_max);
-
-	pipapo_resmap_init_avx2(m, res);
-
 	nft_pipapo_avx2_prepare();
 
-next_match:
 	nft_pipapo_for_each_field(f, i, m) {
 		bool last = i == m->field_count - 1, first = !i;
 		int ret = 0;
 
 #define NFT_SET_PIPAPO_AVX2_LOOKUP(b, n)				\
 		(ret = nft_pipapo_avx2_lookup_##b##b_##n(res, fill, f,	\
-							 ret, rp,	\
+							 ret, data,	\
 							 first, last))
 
 		if (likely(f->bb == 8)) {
@ net/netfilter/nft_set_pipapo_avx2.c:1204 @ nft_pipapo_avx2_lookup(const struct net *net, const struct nft_set *set,
 				NFT_SET_PIPAPO_AVX2_LOOKUP(8, 16);
 			} else {
 				ret = nft_pipapo_avx2_lookup_slow(m, res, fill, f,
-								  ret, rp,
+								  ret, data,
 								  first, last);
 			}
 		} else {
@ net/netfilter/nft_set_pipapo_avx2.c:1220 @ nft_pipapo_avx2_lookup(const struct net *net, const struct nft_set *set,
 				NFT_SET_PIPAPO_AVX2_LOOKUP(4, 32);
 			} else {
 				ret = nft_pipapo_avx2_lookup_slow(m, res, fill, f,
-								  ret, rp,
+								  ret, data,
 								  first, last);
 			}
 		}
@ net/netfilter/nft_set_pipapo_avx2.c:1228 @ nft_pipapo_avx2_lookup(const struct net *net, const struct nft_set *set,
 
 #undef NFT_SET_PIPAPO_AVX2_LOOKUP
 
-		if (ret < 0)
-			goto out;
-
-		if (last) {
-			const struct nft_set_ext *e = &f->mt[ret].e->ext;
-
-			if (unlikely(nft_set_elem_expired(e)))
-				goto next_match;
-
-			ext = e;
-			goto out;
+next_match:
+		if (ret < 0) {
+			scratch->map_index = map_index;
+			kernel_fpu_end();
+			__local_unlock_nested_bh(&scratch->bh_lock);
+			return NULL;
 		}
 
+		if (last) {
+			struct nft_pipapo_elem *e;
+
+			e = f->mt[ret].e;
+			if (unlikely(__nft_set_elem_expired(&e->ext, tstamp) ||
+				     !nft_set_elem_active(&e->ext, genmask))) {
+				ret = pipapo_refill(res, f->bsize, f->rules,
+						    fill, f->mt, last);
+				goto next_match;
+			}
+
+			scratch->map_index = map_index;
+			kernel_fpu_end();
+			__local_unlock_nested_bh(&scratch->bh_lock);
+			return e;
+		}
+
+		map_index = !map_index;
 		swap(res, fill);
-		rp += NFT_PIPAPO_GROUPS_PADDED_SIZE(f);
+		data += NFT_PIPAPO_GROUPS_PADDED_SIZE(f);
 	}
 
-out:
-	if (i % 2)
-		scratch->map_index = !map_index;
 	kernel_fpu_end();
+	__local_unlock_nested_bh(&scratch->bh_lock);
+	return NULL;
+}
+
+/**
+ * nft_pipapo_avx2_lookup() - Dataplane frontend for AVX2 implementation
+ * @net:	Network namespace
+ * @set:	nftables API set representation
+ * @key:	nftables API element representation containing key data
+ *
+ * This function is called from the data path.  It will search for
+ * an element matching the given key in the current active copy using
+ * the AVX2 routines if the FPU is usable or fall back to the generic
+ * implementation of the algorithm otherwise.
+ *
+ * Return: nftables API extension pointer or NULL if no match.
+ */
+const struct nft_set_ext *
+nft_pipapo_avx2_lookup(const struct net *net, const struct nft_set *set,
+		       const u32 *key)
+{
+	struct nft_pipapo *priv = nft_set_priv(set);
+	const struct nft_pipapo_match *m;
+	const u8 *rp = (const u8 *)key;
+	const struct nft_pipapo_elem *e;
+
+	local_bh_disable();
+
+	if (unlikely(!irq_fpu_usable())) {
+		const struct nft_set_ext *ext;
+
+		ext = nft_pipapo_lookup(net, set, key);
+
+		local_bh_enable();
+		return ext;
+	}
+
+	m = rcu_dereference(priv->match);
+
+	e = pipapo_get_avx2(m, rp, 0, get_jiffies_64());
 	local_bh_enable();
 
-	return ext;
+	return e ? &e->ext : NULL;
 }
@ net/netfilter/nft_set_pipapo_avx2.h:8 @
 #include <asm/fpu/xstate.h>
 #define NFT_PIPAPO_ALIGN	(XSAVE_YMM_SIZE / BITS_PER_BYTE)
 
+struct nft_pipapo_match;
 bool nft_pipapo_avx2_estimate(const struct nft_set_desc *desc, u32 features,
 			      struct nft_set_estimate *est);
+struct nft_pipapo_elem *pipapo_get_avx2(const struct nft_pipapo_match *m,
+					const u8 *data, u8 genmask,
+					u64 tstamp);
 #endif /* defined(CONFIG_X86_64) && !defined(CONFIG_UML) */
 
 #endif /* _NFT_SET_PIPAPO_AVX2_H */