From d06529b2e4629f4c88a0a777a61839adf2893b2e Mon Sep 17 00:00:00 2001
From: John Ogness <john.ogness@linutronix.de>
Date: Mon, 30 Nov 2020 01:42:07 +0106
Subject: [PATCH 012/158] printk: move console printing to kthreads

Create a kthread for each console to perform console printing. Now
all console printing is fully asynchronous except for the boot
console and when the kernel enters sync mode (and there are atomic
consoles available).

The console_lock() and console_unlock() functions now only do what
their name says... locking and unlocking of the console.

Signed-off-by: John Ogness <john.ogness@linutronix.de>
Signed-off-by: Thomas Gleixner <tglx@linutronix.de>
---
 include/linux/console.h |  13 +
 kernel/printk/printk.c  | 717 +++++++++++++---------------------------
 2 files changed, 237 insertions(+), 493 deletions(-)

Index: linux-5.15.19-rt29/include/linux/console.h
===================================================================
--- linux-5.15.19-rt29.orig/include/linux/console.h
+++ linux-5.15.19-rt29/include/linux/console.h
@@ -17,6 +17,12 @@
 #include <linux/atomic.h>
 #include <linux/types.h>
 #include <linux/printk.h>
+#include <linux/seqlock.h>
+
+struct latched_seq {
+	seqcount_latch_t	latch;
+	u64			val[2];
+};
 
 struct vc_data;
 struct console_font_op;
@@ -153,7 +159,14 @@ struct console {
 	int	cflag;
 #ifdef CONFIG_PRINTK
 	char	sync_buf[CONSOLE_LOG_MAX];
+	struct latched_seq printk_seq;
+	struct latched_seq printk_sync_seq;
+#ifdef CONFIG_HAVE_NMI
+	struct latched_seq printk_sync_nmi_seq;
 #endif
+#endif /* CONFIG_PRINTK */
+
+	struct task_struct *thread;
 	uint	ispeed;
 	uint	ospeed;
 	void	*data;
Index: linux-5.15.19-rt29/kernel/printk/printk.c
===================================================================
--- linux-5.15.19-rt29.orig/kernel/printk/printk.c
+++ linux-5.15.19-rt29/kernel/printk/printk.c
@@ -45,6 +45,7 @@
 #include <linux/ctype.h>
 #include <linux/uio.h>
 #include <linux/kgdb.h>
+#include <linux/kthread.h>
 #include <linux/clocksource.h>
 #include <linux/sched/clock.h>
 #include <linux/sched/debug.h>
@@ -269,11 +270,6 @@ static void __up_console_sem(unsigned lo
 static int console_locked, console_suspended;
 
 /*
- * If exclusive_console is non-NULL then only this console is to be printed to.
- */
-static struct console *exclusive_console;
-
-/*
  *	Array of consoles built from command line options (console=)
  */
 
@@ -352,10 +348,10 @@ static int console_msg_format = MSG_FORM
  * non-prinatable characters are escaped in the "\xff" notation.
  */
 
+#ifdef CONFIG_PRINTK
 /* syslog_lock protects syslog_* variables and write access to clear_seq. */
 static DEFINE_MUTEX(syslog_lock);
 
-#ifdef CONFIG_PRINTK
 /* Set to enable sync mode. Once set, it is never cleared. */
 static bool sync_mode;
 
@@ -366,40 +362,6 @@ static u64 syslog_seq;
 static size_t syslog_partial;
 static bool syslog_time;
 
-/* Both protected by @console_sem. */
-static u64 exclusive_console_stop_seq;
-static unsigned long console_dropped;
-
-struct latched_seq {
-	seqcount_latch_t	latch;
-	u64			val[2];
-};
-
-/*
- * The next printk record to write to the console. There are two
- * copies (updated with seqcount_latch) so that reads can locklessly
- * access a valid value. Writers are synchronized by @console_sem.
- */
-static struct latched_seq console_seq = {
-	.latch		= SEQCNT_LATCH_ZERO(console_seq.latch),
-	.val[0]		= 0,
-	.val[1]		= 0,
-};
-
-static struct latched_seq console_sync_seq = {
-	.latch		= SEQCNT_LATCH_ZERO(console_sync_seq.latch),
-	.val[0]		= 0,
-	.val[1]		= 0,
-};
-
-#ifdef CONFIG_HAVE_NMI
-static struct latched_seq console_sync_nmi_seq = {
-	.latch		= SEQCNT_LATCH_ZERO(console_sync_nmi_seq.latch),
-	.val[0]		= 0,
-	.val[1]		= 0,
-};
-#endif
-
 /*
  * The next printk record to read after the last 'clear' command. There are
  * two copies (updated with seqcount_latch) so that reads can locklessly
@@ -1799,6 +1761,8 @@ static bool console_may_sync(struct cons
 		return false;
 	if (con->write_atomic && kernel_sync_mode())
 		return true;
+	if (con->write && (con->flags & CON_BOOT) && !con->thread)
+		return true;
 	return false;
 }
 
@@ -1806,12 +1770,21 @@ static bool call_sync_console_driver(str
 {
 	if (!(con->flags & CON_ENABLED))
 		return false;
-	if (con->write_atomic && kernel_sync_mode())
+
+	if (con->write_atomic && kernel_sync_mode()) {
 		con->write_atomic(con, text, text_len);
-	else
-		return false;
+		return true;
+	}
 
-	return true;
+	if (con->write && (con->flags & CON_BOOT) && !con->thread) {
+		if (console_trylock()) {
+			con->write(con, text, text_len);
+			console_unlock();
+			return true;
+		}
+	}
+
+	return false;
 }
 
 static bool have_atomic_console(void)
@@ -1856,24 +1829,24 @@ static bool print_sync(struct console *c
 	return true;
 }
 
-static u64 read_console_seq(void)
+static u64 read_console_seq(struct console *con)
 {
 	u64 seq2;
 	u64 seq;
 
-	seq = latched_seq_read_nolock(&console_seq);
-	seq2 = latched_seq_read_nolock(&console_sync_seq);
+	seq = latched_seq_read_nolock(&con->printk_seq);
+	seq2 = latched_seq_read_nolock(&con->printk_sync_seq);
 	if (seq2 > seq)
 		seq = seq2;
 #ifdef CONFIG_HAVE_NMI
-	seq2 = latched_seq_read_nolock(&console_sync_nmi_seq);
+	seq2 = latched_seq_read_nolock(&con->printk_sync_nmi_seq);
 	if (seq2 > seq)
 		seq = seq2;
 #endif
 	return seq;
 }
 
-static void print_sync_until(struct console *con, u64 seq)
+static void print_sync_until(struct console *con, u64 seq, bool is_locked)
 {
 	u64 printk_seq;
 
@@ -1881,210 +1854,26 @@ static void print_sync_until(struct cons
 		cpu_relax();
 
 	for (;;) {
-		printk_seq = read_console_seq();
+		printk_seq = read_console_seq(con);
 		if (printk_seq >= seq)
 			break;
 		if (!print_sync(con, &printk_seq))
 			break;
+
+		if (is_locked)
+			latched_seq_write(&con->printk_seq, printk_seq + 1);
 #ifdef CONFIG_PRINTK_NMI
-		if (in_nmi()) {
-			latched_seq_write(&console_sync_nmi_seq, printk_seq + 1);
-			continue;
-		}
+		else if (in_nmi())
+			latched_seq_write(&con->printk_sync_nmi_seq, printk_seq + 1);
 #endif
-		latched_seq_write(&console_sync_seq, printk_seq + 1);
+		else
+			latched_seq_write(&con->printk_sync_seq, printk_seq + 1);
 	}
 
 	__printk_cpu_unlock();
 }
 
 /*
- * Special console_lock variants that help to reduce the risk of soft-lockups.
- * They allow to pass console_lock to another printk() call using a busy wait.
- */
-
-#ifdef CONFIG_LOCKDEP
-static struct lockdep_map console_owner_dep_map = {
-	.name = "console_owner"
-};
-#endif
-
-static DEFINE_RAW_SPINLOCK(console_owner_lock);
-static struct task_struct *console_owner;
-static bool console_waiter;
-
-/**
- * console_lock_spinning_enable - mark beginning of code where another
- *	thread might safely busy wait
- *
- * This basically converts console_lock into a spinlock. This marks
- * the section where the console_lock owner can not sleep, because
- * there may be a waiter spinning (like a spinlock). Also it must be
- * ready to hand over the lock at the end of the section.
- */
-static void console_lock_spinning_enable(void)
-{
-	raw_spin_lock(&console_owner_lock);
-	console_owner = current;
-	raw_spin_unlock(&console_owner_lock);
-
-	/* The waiter may spin on us after setting console_owner */
-	spin_acquire(&console_owner_dep_map, 0, 0, _THIS_IP_);
-}
-
-/**
- * console_lock_spinning_disable_and_check - mark end of code where another
- *	thread was able to busy wait and check if there is a waiter
- *
- * This is called at the end of the section where spinning is allowed.
- * It has two functions. First, it is a signal that it is no longer
- * safe to start busy waiting for the lock. Second, it checks if
- * there is a busy waiter and passes the lock rights to her.
- *
- * Important: Callers lose the lock if there was a busy waiter.
- *	They must not touch items synchronized by console_lock
- *	in this case.
- *
- * Return: 1 if the lock rights were passed, 0 otherwise.
- */
-static int console_lock_spinning_disable_and_check(void)
-{
-	int waiter;
-
-	raw_spin_lock(&console_owner_lock);
-	waiter = READ_ONCE(console_waiter);
-	console_owner = NULL;
-	raw_spin_unlock(&console_owner_lock);
-
-	if (!waiter) {
-		spin_release(&console_owner_dep_map, _THIS_IP_);
-		return 0;
-	}
-
-	/* The waiter is now free to continue */
-	WRITE_ONCE(console_waiter, false);
-
-	spin_release(&console_owner_dep_map, _THIS_IP_);
-
-	/*
-	 * Hand off console_lock to waiter. The waiter will perform
-	 * the up(). After this, the waiter is the console_lock owner.
-	 */
-	mutex_release(&console_lock_dep_map, _THIS_IP_);
-	return 1;
-}
-
-/**
- * console_trylock_spinning - try to get console_lock by busy waiting
- *
- * This allows to busy wait for the console_lock when the current
- * owner is running in specially marked sections. It means that
- * the current owner is running and cannot reschedule until it
- * is ready to lose the lock.
- *
- * Return: 1 if we got the lock, 0 othrewise
- */
-static int console_trylock_spinning(void)
-{
-	struct task_struct *owner = NULL;
-	bool waiter;
-	bool spin = false;
-	unsigned long flags;
-
-	if (console_trylock())
-		return 1;
-
-	printk_safe_enter_irqsave(flags);
-
-	raw_spin_lock(&console_owner_lock);
-	owner = READ_ONCE(console_owner);
-	waiter = READ_ONCE(console_waiter);
-	if (!waiter && owner && owner != current) {
-		WRITE_ONCE(console_waiter, true);
-		spin = true;
-	}
-	raw_spin_unlock(&console_owner_lock);
-
-	/*
-	 * If there is an active printk() writing to the
-	 * consoles, instead of having it write our data too,
-	 * see if we can offload that load from the active
-	 * printer, and do some printing ourselves.
-	 * Go into a spin only if there isn't already a waiter
-	 * spinning, and there is an active printer, and
-	 * that active printer isn't us (recursive printk?).
-	 */
-	if (!spin) {
-		printk_safe_exit_irqrestore(flags);
-		return 0;
-	}
-
-	/* We spin waiting for the owner to release us */
-	spin_acquire(&console_owner_dep_map, 0, 0, _THIS_IP_);
-	/* Owner will clear console_waiter on hand off */
-	while (READ_ONCE(console_waiter))
-		cpu_relax();
-	spin_release(&console_owner_dep_map, _THIS_IP_);
-
-	printk_safe_exit_irqrestore(flags);
-	/*
-	 * The owner passed the console lock to us.
-	 * Since we did not spin on console lock, annotate
-	 * this as a trylock. Otherwise lockdep will
-	 * complain.
-	 */
-	mutex_acquire(&console_lock_dep_map, 0, 1, _THIS_IP_);
-
-	return 1;
-}
-
-/*
- * Call the console drivers, asking them to write out
- * log_buf[start] to log_buf[end - 1].
- * The console_lock must be held.
- */
-static void call_console_drivers(const char *ext_text, size_t ext_len,
-				 const char *text, size_t len)
-{
-	static char dropped_text[64];
-	size_t dropped_len = 0;
-	struct console *con;
-
-	trace_console_rcuidle(text, len);
-
-	if (!console_drivers)
-		return;
-
-	if (console_dropped) {
-		dropped_len = snprintf(dropped_text, sizeof(dropped_text),
-				       "** %lu printk messages dropped **\n",
-				       console_dropped);
-		console_dropped = 0;
-	}
-
-	for_each_console(con) {
-		if (exclusive_console && con != exclusive_console)
-			continue;
-		if (!(con->flags & CON_ENABLED))
-			continue;
-		if (!con->write)
-			continue;
-		if (!cpu_online(smp_processor_id()) &&
-		    !(con->flags & CON_ANYTIME))
-			continue;
-		if (kernel_sync_mode())
-			continue;
-		if (con->flags & CON_EXTENDED)
-			con->write(con, ext_text, ext_len);
-		else {
-			if (dropped_len)
-				con->write(con, dropped_text, dropped_len);
-			con->write(con, text, len);
-		}
-	}
-}
-
-/*
  * Recursion is tracked separately on each CPU. If NMIs are supported, an
  * additional NMI context per CPU is also separately tracked. Until per-CPU
  * is available, a separate "early tracking" is performed.
@@ -2354,7 +2143,7 @@ out:
 
 		for_each_console(con) {
 			if (console_may_sync(con))
-				print_sync_until(con, seq + 1);
+				print_sync_until(con, seq + 1, false);
 		}
 	}
 
@@ -2367,39 +2156,16 @@ asmlinkage int vprintk_emit(int facility
 			    const char *fmt, va_list args)
 {
 	int printed_len;
-	bool in_sched = false;
 
 	/* Suppress unimportant messages after panic happens */
 	if (unlikely(suppress_printk))
 		return 0;
 
-	if (level == LOGLEVEL_SCHED) {
+	if (level == LOGLEVEL_SCHED)
 		level = LOGLEVEL_DEFAULT;
-		in_sched = true;
-	}
-
-	printk_delay(level);
 
 	printed_len = vprintk_store(facility, level, dev_info, fmt, args);
 
-	/* If called from the scheduler, we can not call up(). */
-	if (!in_sched) {
-		/*
-		 * Disable preemption to avoid being preempted while holding
-		 * console_sem which would prevent anyone from printing to
-		 * console
-		 */
-		preempt_disable();
-		/*
-		 * Try to acquire and then immediately release the console
-		 * semaphore.  The release will print out buffers and wake up
-		 * /dev/kmsg and syslog() users.
-		 */
-		if (console_trylock_spinning())
-			console_unlock();
-		preempt_enable();
-	}
-
 	wake_up_klogd();
 	return printed_len;
 }
@@ -2424,37 +2190,162 @@ asmlinkage __visible int _printk(const c
 }
 EXPORT_SYMBOL(_printk);
 
-#else /* CONFIG_PRINTK */
+static int printk_kthread_func(void *data)
+{
+	struct console *con = data;
+	unsigned long dropped = 0;
+	char *dropped_text = NULL;
+	struct printk_info info;
+	struct printk_record r;
+	char *ext_text = NULL;
+	size_t dropped_len;
+	int ret = -ENOMEM;
+	char *text = NULL;
+	char *write_text;
+	size_t len;
+	int error;
+	u64 seq;
+
+	if (con->flags & CON_EXTENDED) {
+		ext_text = kmalloc(CONSOLE_EXT_LOG_MAX, GFP_KERNEL);
+		if (!ext_text)
+			goto out;
+	}
+	text = kmalloc(LOG_LINE_MAX + PREFIX_MAX, GFP_KERNEL);
+	dropped_text = kmalloc(64, GFP_KERNEL);
+	if (!text || !dropped_text)
+		goto out;
+	if (con->flags & CON_EXTENDED)
+		write_text = ext_text;
+	else
+		write_text = text;
+
+	seq = read_console_seq(con);
+
+	prb_rec_init_rd(&r, &info, text, LOG_LINE_MAX + PREFIX_MAX);
 
-#define printk_time		false
+	for (;;) {
+		error = wait_event_interruptible(log_wait,
+						 prb_read_valid(prb, seq, &r) || kthread_should_stop());
 
-#define prb_read_valid(rb, seq, r)	false
-#define prb_first_valid_seq(rb)		0
-#define read_console_seq()		0
-#define latched_seq_write(dst, src)
-#define kernel_sync_mode()		false
+		if (kthread_should_stop())
+			break;
 
-static u64 exclusive_console_stop_seq;
-static unsigned long console_dropped;
+		if (error)
+			continue;
 
-static size_t record_print_text(const struct printk_record *r,
-				bool syslog, bool time)
+		if (seq != r.info->seq) {
+			dropped += r.info->seq - seq;
+			seq = r.info->seq;
+		}
+
+		seq++;
+
+		if (!(con->flags & CON_ENABLED))
+			continue;
+
+		if (suppress_message_printing(r.info->level))
+			continue;
+
+		if (con->flags & CON_EXTENDED) {
+			len = info_print_ext_header(ext_text,
+						    CONSOLE_EXT_LOG_MAX,
+						    r.info);
+			len += msg_print_ext_body(ext_text + len,
+						  CONSOLE_EXT_LOG_MAX - len,
+						  &r.text_buf[0], r.info->text_len,
+						  &r.info->dev_info);
+		} else {
+			len = record_print_text(&r,
+						console_msg_format & MSG_FORMAT_SYSLOG,
+						printk_time);
+		}
+
+		console_lock();
+
+		/*
+		 * Even though the printk kthread is always preemptible, it is
+		 * still not allowed to call cond_resched() from within
+		 * console drivers. The task may become non-preemptible in the
+		 * console driver call chain. For example, vt_console_print()
+		 * takes a spinlock and then can call into fbcon_redraw(),
+		 * which can conditionally invoke cond_resched().
+		 */
+		console_may_schedule = 0;
+
+		if (kernel_sync_mode() && con->write_atomic) {
+			console_unlock();
+			break;
+		}
+
+		if (!(con->flags & CON_EXTENDED) && dropped) {
+			dropped_len = snprintf(dropped_text, 64,
+					       "** %lu printk messages dropped **\n",
+					       dropped);
+			dropped = 0;
+
+			con->write(con, dropped_text, dropped_len);
+			printk_delay(r.info->level);
+		}
+
+		con->write(con, write_text, len);
+		if (len)
+			printk_delay(r.info->level);
+
+		latched_seq_write(&con->printk_seq, seq);
+
+		console_unlock();
+	}
+	ret = 0;
+out:
+	kfree(dropped_text);
+	kfree(text);
+	kfree(ext_text);
+	pr_info("%sconsole [%s%d]: printing thread stopped\n",
+		(con->flags & CON_BOOT) ? "boot" : "",
+		con->name, con->index);
+	return ret;
+}
+
+/* Must be called within console_lock(). */
+static void start_printk_kthread(struct console *con)
 {
-	return 0;
+	con->thread = kthread_run(printk_kthread_func, con,
+				  "pr/%s%d", con->name, con->index);
+	if (IS_ERR(con->thread)) {
+		pr_err("%sconsole [%s%d]: unable to start printing thread\n",
+		       (con->flags & CON_BOOT) ? "boot" : "",
+		       con->name, con->index);
+		return;
+	}
+	pr_info("%sconsole [%s%d]: printing thread started\n",
+		(con->flags & CON_BOOT) ? "boot" : "",
+		con->name, con->index);
 }
-static ssize_t info_print_ext_header(char *buf, size_t size,
-				     struct printk_info *info)
+
+/* protected by console_lock */
+static bool kthreads_started;
+
+/* Must be called within console_lock(). */
+static void console_try_thread(struct console *con)
 {
-	return 0;
+	if (kthreads_started) {
+		start_printk_kthread(con);
+		return;
+	}
+
+	/*
+	 * The printing threads have not been started yet. If this console
+	 * can print synchronously, print all unprinted messages.
+	 */
+	if (console_may_sync(con)) {
+		unsigned long flags;
+
+		local_irq_save(flags);
+		print_sync_until(con, prb_next_seq(prb), true);
+		local_irq_restore(flags);
+	}
 }
-static ssize_t msg_print_ext_body(char *buf, size_t size,
-				  char *text, size_t text_len,
-				  struct dev_printk_info *dev_info) { return 0; }
-static void console_lock_spinning_enable(void) { }
-static int console_lock_spinning_disable_and_check(void) { return 0; }
-static void call_console_drivers(const char *ext_text, size_t ext_len,
-				 const char *text, size_t len) {}
-static bool suppress_message_printing(int level) { return false; }
 
 #endif /* CONFIG_PRINTK */
 
@@ -2711,36 +2602,6 @@ int is_console_locked(void)
 }
 EXPORT_SYMBOL(is_console_locked);
 
-/*
- * Check if we have any console that is capable of printing while cpu is
- * booting or shutting down. Requires console_sem.
- */
-static int have_callable_console(void)
-{
-	struct console *con;
-
-	for_each_console(con)
-		if ((con->flags & CON_ENABLED) &&
-				(con->flags & CON_ANYTIME))
-			return 1;
-
-	return 0;
-}
-
-/*
- * Can we actually use the console at this time on this cpu?
- *
- * Console drivers may assume that per-cpu resources have been allocated. So
- * unless they're explicitly marked as being able to cope (CON_ANYTIME) don't
- * call them until this CPU is officially up.
- */
-static inline int can_use_console(void)
-{
-	if (kernel_sync_mode())
-		return false;
-	return cpu_online(raw_smp_processor_id()) || have_callable_console();
-}
-
 /**
  * console_unlock - unlock the console system
  *
@@ -2757,139 +2618,13 @@ static inline int can_use_console(void)
  */
 void console_unlock(void)
 {
-	static char ext_text[CONSOLE_EXT_LOG_MAX];
-	static char text[CONSOLE_LOG_MAX];
-	unsigned long flags;
-	bool do_cond_resched, retry;
-	struct printk_info info;
-	struct printk_record r;
-	u64 seq;
-
 	if (console_suspended) {
 		up_console_sem();
 		return;
 	}
 
-	prb_rec_init_rd(&r, &info, text, sizeof(text));
-
-	/*
-	 * Console drivers are called with interrupts disabled, so
-	 * @console_may_schedule should be cleared before; however, we may
-	 * end up dumping a lot of lines, for example, if called from
-	 * console registration path, and should invoke cond_resched()
-	 * between lines if allowable.  Not doing so can cause a very long
-	 * scheduling stall on a slow console leading to RCU stall and
-	 * softlockup warnings which exacerbate the issue with more
-	 * messages practically incapacitating the system.
-	 *
-	 * console_trylock() is not able to detect the preemptive
-	 * context reliably. Therefore the value must be stored before
-	 * and cleared after the "again" goto label.
-	 */
-	do_cond_resched = console_may_schedule;
-again:
-	console_may_schedule = 0;
-
-	/*
-	 * We released the console_sem lock, so we need to recheck if
-	 * cpu is online and (if not) is there at least one CON_ANYTIME
-	 * console.
-	 */
-	if (!can_use_console()) {
-		console_locked = 0;
-		up_console_sem();
-		return;
-	}
-
-	for (;;) {
-		size_t ext_len = 0;
-		int handover;
-		size_t len;
-
-skip:
-		seq = read_console_seq();
-		if (!prb_read_valid(prb, seq, &r))
-			break;
-
-		if (seq != r.info->seq) {
-			console_dropped += r.info->seq - seq;
-			latched_seq_write(&console_seq, r.info->seq);
-			seq = r.info->seq;
-		}
-
-		if (suppress_message_printing(r.info->level)) {
-			/*
-			 * Skip record we have buffered and already printed
-			 * directly to the console when we received it, and
-			 * record that has level above the console loglevel.
-			 */
-			latched_seq_write(&console_seq, seq + 1);
-			goto skip;
-		}
-
-		/* Output to all consoles once old messages replayed. */
-		if (unlikely(exclusive_console &&
-			     seq >= exclusive_console_stop_seq)) {
-			exclusive_console = NULL;
-		}
-
-		/*
-		 * Handle extended console text first because later
-		 * record_print_text() will modify the record buffer in-place.
-		 */
-		if (nr_ext_console_drivers) {
-			ext_len = info_print_ext_header(ext_text,
-						sizeof(ext_text),
-						r.info);
-			ext_len += msg_print_ext_body(ext_text + ext_len,
-						sizeof(ext_text) - ext_len,
-						&r.text_buf[0],
-						r.info->text_len,
-						&r.info->dev_info);
-		}
-		len = record_print_text(&r,
-				console_msg_format & MSG_FORMAT_SYSLOG,
-				printk_time);
-		latched_seq_write(&console_seq, seq + 1);
-
-		/*
-		 * While actively printing out messages, if another printk()
-		 * were to occur on another CPU, it may wait for this one to
-		 * finish. This task can not be preempted if there is a
-		 * waiter waiting to take over.
-		 *
-		 * Interrupts are disabled because the hand over to a waiter
-		 * must not be interrupted until the hand over is completed
-		 * (@console_waiter is cleared).
-		 */
-		printk_safe_enter_irqsave(flags);
-		console_lock_spinning_enable();
-
-		stop_critical_timings();	/* don't trace print latency */
-		call_console_drivers(ext_text, ext_len, text, len);
-		start_critical_timings();
-
-		handover = console_lock_spinning_disable_and_check();
-		printk_safe_exit_irqrestore(flags);
-		if (handover)
-			return;
-
-		if (do_cond_resched)
-			cond_resched();
-	}
-
 	console_locked = 0;
 	up_console_sem();
-
-	/*
-	 * Someone could have filled up the buffer again, so re-check if there's
-	 * something to flush. In case we cannot trylock the console_sem again,
-	 * there's a new owner and the console_unlock() from them will do the
-	 * flush, no worries.
-	 */
-	retry = prb_read_valid(prb, read_console_seq(), NULL);
-	if (retry && console_trylock())
-		goto again;
 }
 EXPORT_SYMBOL(console_unlock);
 
@@ -2939,19 +2674,20 @@ void console_unblank(void)
  */
 void console_flush_on_panic(enum con_flush_mode mode)
 {
-	if (console_trylock()) {
-		if (mode == CONSOLE_REPLAY_ALL)
-			latched_seq_write(&console_seq, prb_first_valid_seq(prb));
-	} else {
-		/*
-		 * Another context is holding the console lock and
-		 * @console_may_schedule may be set. Ignore and proceed to
-		 * unlock so that messages are flushed out. As this can be
-		 * called from any context and we don't want to get preempted
-		 * while flushing, ensure @console_may_schedule is cleared.
-		 */
-		console_may_schedule = 0;
+	if (!console_trylock())
+		return;
+
+#ifdef CONFIG_PRINTK
+	if (mode == CONSOLE_REPLAY_ALL) {
+		struct console *c;
+		u64 seq;
+
+		seq = prb_first_valid_seq(prb);
+		for_each_console(c)
+			latched_seq_write(&c->printk_seq, seq);
 	}
+#endif
+
 	console_unlock();
 }
 
@@ -3087,6 +2823,7 @@ static int try_enable_new_console(struct
 void register_console(struct console *newcon)
 {
 	struct console *bcon = NULL;
+	u64 __maybe_unused seq = 0;
 	int err;
 
 	for_each_console(bcon) {
@@ -3109,6 +2846,8 @@ void register_console(struct console *ne
 		}
 	}
 
+	newcon->thread = NULL;
+
 	if (console_drivers && console_drivers->flags & CON_BOOT)
 		bcon = console_drivers;
 
@@ -3173,27 +2912,21 @@ void register_console(struct console *ne
 	if (newcon->flags & CON_EXTENDED)
 		nr_ext_console_drivers++;
 
-	if (newcon->flags & CON_PRINTBUFFER) {
-		/*
-		 * console_unlock(); will print out the buffered messages
-		 * for us.
-		 *
-		 * We're about to replay the log buffer.  Only do this to the
-		 * just-registered console to avoid excessive message spam to
-		 * the already-registered consoles.
-		 *
-		 * Set exclusive_console with disabled interrupts to reduce
-		 * race window with eventual console_flush_on_panic() that
-		 * ignores console_lock.
-		 */
-		exclusive_console = newcon;
-		exclusive_console_stop_seq = read_console_seq();
+#ifdef CONFIG_PRINTK
+	if (!(newcon->flags & CON_PRINTBUFFER))
+		seq = prb_next_seq(prb);
 
-		/* Get a consistent copy of @syslog_seq. */
-		mutex_lock(&syslog_lock);
-		latched_seq_write(&console_seq, syslog_seq);
-		mutex_unlock(&syslog_lock);
-	}
+	seqcount_latch_init(&newcon->printk_seq.latch);
+	latched_seq_write(&newcon->printk_seq, seq);
+	seqcount_latch_init(&newcon->printk_sync_seq.latch);
+	latched_seq_write(&newcon->printk_sync_seq, seq);
+#ifdef CONFIG_HAVE_NMI
+	seqcount_latch_init(&newcon->printk_sync_nmi_seq.latch);
+	latched_seq_write(&newcon->printk_sync_nmi_seq, seq);
+#endif
+
+	console_try_thread(newcon);
+#endif /* CONFIG_PRINTK */
 	console_unlock();
 	console_sysfs_notify();
 
@@ -3267,6 +3000,9 @@ int unregister_console(struct console *c
 	console_unlock();
 	console_sysfs_notify();
 
+	if (console->thread && !IS_ERR(console->thread))
+		kthread_stop(console->thread);
+
 	if (console->exit)
 		res = console->exit(console);
 
@@ -3349,6 +3085,15 @@ static int __init printk_late_init(void)
 			unregister_console(con);
 		}
 	}
+
+#ifdef CONFIG_PRINTK
+	console_lock();
+	for_each_console(con)
+		start_printk_kthread(con);
+	kthreads_started = true;
+	console_unlock();
+#endif
+
 	ret = cpuhp_setup_state_nocalls(CPUHP_PRINTK_DEAD, "printk:dead", NULL,
 					console_cpu_notify);
 	WARN_ON(ret < 0);
@@ -3364,7 +3109,6 @@ late_initcall(printk_late_init);
  * Delayed printk version, for scheduler-internal messages:
  */
 #define PRINTK_PENDING_WAKEUP	0x01
-#define PRINTK_PENDING_OUTPUT	0x02
 
 static DEFINE_PER_CPU(int, printk_pending);
 
@@ -3372,14 +3116,8 @@ static void wake_up_klogd_work_func(stru
 {
 	int pending = __this_cpu_xchg(printk_pending, 0);
 
-	if (pending & PRINTK_PENDING_OUTPUT) {
-		/* If trylock fails, someone else is doing the printing */
-		if (console_trylock())
-			console_unlock();
-	}
-
 	if (pending & PRINTK_PENDING_WAKEUP)
-		wake_up_interruptible(&log_wait);
+		wake_up_interruptible_all(&log_wait);
 }
 
 static DEFINE_PER_CPU(struct irq_work, wake_up_klogd_work) =
@@ -3400,18 +3138,11 @@ void wake_up_klogd(void)
 
 void defer_console_output(void)
 {
-	if (!printk_percpu_data_ready())
-		return;
-
-	preempt_disable();
-	__this_cpu_or(printk_pending, PRINTK_PENDING_OUTPUT);
-	irq_work_queue(this_cpu_ptr(&wake_up_klogd_work));
-	preempt_enable();
 }
 
 void printk_trigger_flush(void)
 {
-	defer_console_output();
+	wake_up_klogd();
 }
 
 int vprintk_deferred(const char *fmt, va_list args)