From: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
Date: Tue, 19 Jul 2022 20:08:01 +0200
Subject: [PATCH] printk: Bring back the RT bits.

This is a revert of the commits:
| 07a22b61946f0 Revert "printk: add functions to prefer direct printing"
| 5831788afb17b Revert "printk: add kthread console printers"
| 2d9ef940f89e0 Revert "printk: extend console_lock for per-console locking"
| 007eeab7e9f03 Revert "printk: remove @console_locked"
| 05c96b3713aa2 Revert "printk: Block console kthreads when direct printing will be required"
| 20fb0c8272bbb Revert "printk: Wait for the global console lock when the system is going down"

which is needed for the atomic consoles which are used on PREEMPT_RT.

Signed-off-by: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
---
 drivers/tty/sysrq.c         |    2 
 include/linux/console.h     |   17 +
 include/linux/printk.h      |   16 +
 kernel/hung_task.c          |   11 
 kernel/panic.c              |    6 
 kernel/printk/internal.h    |    2 
 kernel/printk/printk.c      |  593 +++++++++++++++++++++++++++++++++++++++-----
 kernel/printk/printk_safe.c |   32 ++
 kernel/rcu/tree_stall.h     |    2 
 kernel/reboot.c             |   16 +
 kernel/watchdog.c           |    4 
 kernel/watchdog_hld.c       |    4 
 12 files changed, 640 insertions(+), 65 deletions(-)

--- a/drivers/tty/sysrq.c
+++ b/drivers/tty/sysrq.c
@@ -581,6 +581,7 @@ void __handle_sysrq(int key, bool check_
 
 	rcu_sysrq_start();
 	rcu_read_lock();
+	printk_prefer_direct_enter();
 	/*
 	 * Raise the apparent loglevel to maximum so that the sysrq header
 	 * is shown to provide the user with positive feedback.  We do not
@@ -622,6 +623,7 @@ void __handle_sysrq(int key, bool check_
 		pr_cont("\n");
 		console_loglevel = orig_log_level;
 	}
+	printk_prefer_direct_exit();
 	rcu_read_unlock();
 	rcu_sysrq_end();
 
--- a/include/linux/console.h
+++ b/include/linux/console.h
@@ -16,6 +16,7 @@
 
 #include <linux/atomic.h>
 #include <linux/types.h>
+#include <linux/mutex.h>
 
 struct vc_data;
 struct console_font_op;
@@ -153,6 +154,22 @@ struct console {
 	uint	ospeed;
 	u64	seq;
 	unsigned long dropped;
+	struct task_struct *thread;
+	bool	blocked;
+
+	/*
+	 * The per-console lock is used by printing kthreads to synchronize
+	 * this console with callers of console_lock(). This is necessary in
+	 * order to allow printing kthreads to run in parallel to each other,
+	 * while each safely accessing the @blocked field and synchronizing
+	 * against direct printing via console_lock/console_unlock.
+	 *
+	 * Note: For synchronizing against direct printing via
+	 *       console_trylock/console_unlock, see the static global
+	 *       variable @console_kthreads_active.
+	 */
+	struct mutex lock;
+
 	void	*data;
 	struct	 console *next;
 };
--- a/include/linux/printk.h
+++ b/include/linux/printk.h
@@ -169,7 +169,11 @@ extern void __printk_safe_exit(void);
 #define printk_deferred_enter __printk_safe_enter
 #define printk_deferred_exit __printk_safe_exit
 
+extern void printk_prefer_direct_enter(void);
+extern void printk_prefer_direct_exit(void);
+
 extern bool pr_flush(int timeout_ms, bool reset_on_progress);
+extern void try_block_console_kthreads(int timeout_ms);
 
 /*
  * Please don't use printk_ratelimit(), because it shares ratelimiting state
@@ -221,11 +225,23 @@ static inline void printk_deferred_exit(
 {
 }
 
+static inline void printk_prefer_direct_enter(void)
+{
+}
+
+static inline void printk_prefer_direct_exit(void)
+{
+}
+
 static inline bool pr_flush(int timeout_ms, bool reset_on_progress)
 {
 	return true;
 }
 
+static inline void try_block_console_kthreads(int timeout_ms)
+{
+}
+
 static inline int printk_ratelimit(void)
 {
 	return 0;
--- a/kernel/hung_task.c
+++ b/kernel/hung_task.c
@@ -127,6 +127,8 @@ static void check_hung_task(struct task_
 	 * complain:
 	 */
 	if (sysctl_hung_task_warnings) {
+		printk_prefer_direct_enter();
+
 		if (sysctl_hung_task_warnings > 0)
 			sysctl_hung_task_warnings--;
 		pr_err("INFO: task %s:%d blocked for more than %ld seconds.\n",
@@ -142,6 +144,8 @@ static void check_hung_task(struct task_
 
 		if (sysctl_hung_task_all_cpu_backtrace)
 			hung_task_show_all_bt = true;
+
+		printk_prefer_direct_exit();
 	}
 
 	touch_nmi_watchdog();
@@ -204,12 +208,17 @@ static void check_hung_uninterruptible_t
 	}
  unlock:
 	rcu_read_unlock();
-	if (hung_task_show_lock)
+	if (hung_task_show_lock) {
+		printk_prefer_direct_enter();
 		debug_show_all_locks();
+		printk_prefer_direct_exit();
+	}
 
 	if (hung_task_show_all_bt) {
 		hung_task_show_all_bt = false;
+		printk_prefer_direct_enter();
 		trigger_all_cpu_backtrace();
+		printk_prefer_direct_exit();
 	}
 
 	if (hung_task_call_panic)
--- a/kernel/panic.c
+++ b/kernel/panic.c
@@ -297,6 +297,7 @@ void panic(const char *fmt, ...)
 		 * unfortunately means it may not be hardened to work in a
 		 * panic situation.
 		 */
+		try_block_console_kthreads(10000);
 		smp_send_stop();
 	} else {
 		/*
@@ -304,6 +305,7 @@ void panic(const char *fmt, ...)
 		 * kmsg_dump, we will need architecture dependent extra
 		 * works in addition to stopping other CPUs.
 		 */
+		try_block_console_kthreads(10000);
 		crash_smp_send_stop();
 	}
 
@@ -604,6 +606,8 @@ void __warn(const char *file, int line,
 {
 	disable_trace_on_warning();
 
+	printk_prefer_direct_enter();
+
 	if (file)
 		pr_warn("WARNING: CPU: %d PID: %d at %s:%d %pS\n",
 			raw_smp_processor_id(), current->pid, file, line,
@@ -633,6 +637,8 @@ void __warn(const char *file, int line,
 
 	/* Just a warning, don't kill lockdep. */
 	add_taint(taint, LOCKDEP_STILL_OK);
+
+	printk_prefer_direct_exit();
 }
 
 #ifndef __WARN_FLAGS
--- a/kernel/printk/internal.h
+++ b/kernel/printk/internal.h
@@ -20,6 +20,8 @@ enum printk_info_flags {
 	LOG_CONT	= 8,	/* text is a fragment of a continuation line */
 };
 
+extern bool block_console_kthreads;
+
 __printf(4, 0)
 int vprintk_store(int facility, int level,
 		  const struct dev_printk_info *dev_info,
--- a/kernel/printk/printk.c
+++ b/kernel/printk/printk.c
@@ -224,6 +224,36 @@ int devkmsg_sysctl_set_loglvl(struct ctl
 static int nr_ext_console_drivers;
 
 /*
+ * Used to synchronize printing kthreads against direct printing via
+ * console_trylock/console_unlock.
+ *
+ * Values:
+ * -1 = console kthreads atomically blocked (via global trylock)
+ *  0 = no kthread printing, console not locked (via trylock)
+ * >0 = kthread(s) actively printing
+ *
+ * Note: For synchronizing against direct printing via
+ *       console_lock/console_unlock, see the @lock variable in
+ *       struct console.
+ */
+static atomic_t console_kthreads_active = ATOMIC_INIT(0);
+
+#define console_kthreads_atomic_tryblock() \
+	(atomic_cmpxchg(&console_kthreads_active, 0, -1) == 0)
+#define console_kthreads_atomic_unblock() \
+	atomic_cmpxchg(&console_kthreads_active, -1, 0)
+#define console_kthreads_atomically_blocked() \
+	(atomic_read(&console_kthreads_active) == -1)
+
+#define console_kthread_printing_tryenter() \
+	atomic_inc_unless_negative(&console_kthreads_active)
+#define console_kthread_printing_exit() \
+	atomic_dec(&console_kthreads_active)
+
+/* Block console kthreads to avoid processing new messages. */
+bool block_console_kthreads;
+
+/*
  * Helper macros to handle lockdep when locking/unlocking console_sem. We use
  * macros instead of functions so that _RET_IP_ contains useful information.
  */
@@ -271,14 +301,49 @@ static bool panic_in_progress(void)
 }
 
 /*
- * This is used for debugging the mess that is the VT code by
- * keeping track if we have the console semaphore held. It's
- * definitely not the perfect debug tool (we don't know if _WE_
- * hold it and are racing, but it helps tracking those weird code
- * paths in the console code where we end up in places I want
- * locked without the console semaphore held).
+ * Tracks whether kthread printers are all blocked. A value of true implies
+ * that the console is locked via console_lock() or the console is suspended.
+ * Writing to this variable requires holding @console_sem.
+ */
+static bool console_kthreads_blocked;
+
+/*
+ * Block all kthread printers from a schedulable context.
+ *
+ * Requires holding @console_sem.
+ */
+static void console_kthreads_block(void)
+{
+	struct console *con;
+
+	for_each_console(con) {
+		mutex_lock(&con->lock);
+		con->blocked = true;
+		mutex_unlock(&con->lock);
+	}
+
+	console_kthreads_blocked = true;
+}
+
+/*
+ * Unblock all kthread printers from a schedulable context.
+ *
+ * Requires holding @console_sem.
  */
-static int console_locked, console_suspended;
+static void console_kthreads_unblock(void)
+{
+	struct console *con;
+
+	for_each_console(con) {
+		mutex_lock(&con->lock);
+		con->blocked = false;
+		mutex_unlock(&con->lock);
+	}
+
+	console_kthreads_blocked = false;
+}
+
+static int console_suspended;
 
 /*
  *	Array of consoles built from command line options (console=)
@@ -361,7 +426,75 @@ static int console_msg_format = MSG_FORM
 /* syslog_lock protects syslog_* variables and write access to clear_seq. */
 static DEFINE_MUTEX(syslog_lock);
 
+/*
+ * A flag to signify if printk_activate_kthreads() has already started the
+ * kthread printers. If true, any later registered consoles must start their
+ * own kthread directly. The flag is write protected by the console_lock.
+ */
+static bool printk_kthreads_available;
+
 #ifdef CONFIG_PRINTK
+static atomic_t printk_prefer_direct = ATOMIC_INIT(0);
+
+/**
+ * printk_prefer_direct_enter - cause printk() calls to attempt direct
+ *                              printing to all enabled consoles
+ *
+ * Since it is not possible to call into the console printing code from any
+ * context, there is no guarantee that direct printing will occur.
+ *
+ * This globally effects all printk() callers.
+ *
+ * Context: Any context.
+ */
+void printk_prefer_direct_enter(void)
+{
+	atomic_inc(&printk_prefer_direct);
+}
+
+/**
+ * printk_prefer_direct_exit - restore printk() behavior
+ *
+ * Context: Any context.
+ */
+void printk_prefer_direct_exit(void)
+{
+	WARN_ON(atomic_dec_if_positive(&printk_prefer_direct) < 0);
+}
+
+/*
+ * Calling printk() always wakes kthread printers so that they can
+ * flush the new message to their respective consoles. Also, if direct
+ * printing is allowed, printk() tries to flush the messages directly.
+ *
+ * Direct printing is allowed in situations when the kthreads
+ * are not available or the system is in a problematic state.
+ *
+ * See the implementation about possible races.
+ */
+static inline bool allow_direct_printing(void)
+{
+	/*
+	 * Checking kthread availability is a possible race because the
+	 * kthread printers can become permanently disabled during runtime.
+	 * However, doing that requires holding the console_lock, so any
+	 * pending messages will be direct printed by console_unlock().
+	 */
+	if (!printk_kthreads_available)
+		return true;
+
+	/*
+	 * Prefer direct printing when the system is in a problematic state.
+	 * The context that sets this state will always see the updated value.
+	 * The other contexts do not care. Anyway, direct printing is just a
+	 * best effort. The direct output is only possible when console_lock
+	 * is not already taken and no kthread printers are actively printing.
+	 */
+	return (system_state > SYSTEM_RUNNING ||
+		oops_in_progress ||
+		atomic_read(&printk_prefer_direct));
+}
+
 DECLARE_WAIT_QUEUE_HEAD(log_wait);
 /* All 3 protected by @syslog_lock. */
 /* the next printk record to read by syslog(READ) or /proc/kmsg */
@@ -2252,10 +2385,10 @@ asmlinkage int vprintk_emit(int facility
 	printed_len = vprintk_store(facility, level, dev_info, fmt, args);
 
 	/* If called from the scheduler, we can not call up(). */
-	if (!in_sched) {
+	if (!in_sched && allow_direct_printing()) {
 		/*
 		 * The caller may be holding system-critical or
-		 * timing-sensitive locks. Disable preemption during
+		 * timing-sensitive locks. Disable preemption during direct
 		 * printing of all remaining records to all consoles so that
 		 * this context can return as soon as possible. Hopefully
 		 * another printk() caller will take over the printing.
@@ -2298,6 +2431,8 @@ EXPORT_SYMBOL(_printk);
 
 static bool __pr_flush(struct console *con, int timeout_ms, bool reset_on_progress);
 
+static void printk_start_kthread(struct console *con);
+
 #else /* CONFIG_PRINTK */
 
 #define CONSOLE_LOG_MAX		0
@@ -2331,6 +2466,8 @@ static void call_console_driver(struct c
 }
 static bool suppress_message_printing(int level) { return false; }
 static bool __pr_flush(struct console *con, int timeout_ms, bool reset_on_progress) { return true; }
+static void printk_start_kthread(struct console *con) { }
+static bool allow_direct_printing(void) { return true; }
 
 #endif /* CONFIG_PRINTK */
 
@@ -2549,6 +2686,14 @@ static int console_cpu_notify(unsigned i
 		/* If trylock fails, someone else is doing the printing */
 		if (console_trylock())
 			console_unlock();
+		else {
+			/*
+			 * If a new CPU comes online, the conditions for
+			 * printer_should_wake() may have changed for some
+			 * kthread printer with !CON_ANYTIME.
+			 */
+			wake_up_klogd();
+		}
 	}
 	return 0;
 }
@@ -2568,7 +2713,7 @@ void console_lock(void)
 	down_console_sem();
 	if (console_suspended)
 		return;
-	console_locked = 1;
+	console_kthreads_block();
 	console_may_schedule = 1;
 }
 EXPORT_SYMBOL(console_lock);
@@ -2589,15 +2734,30 @@ int console_trylock(void)
 		up_console_sem();
 		return 0;
 	}
-	console_locked = 1;
+	if (!console_kthreads_atomic_tryblock()) {
+		up_console_sem();
+		return 0;
+	}
 	console_may_schedule = 0;
 	return 1;
 }
 EXPORT_SYMBOL(console_trylock);
 
+/*
+ * This is used to help to make sure that certain paths within the VT code are
+ * running with the console lock held. It is definitely not the perfect debug
+ * tool (it is not known if the VT code is the task holding the console lock),
+ * but it helps tracking those weird code paths in the console code such as
+ * when the console is suspended: where the console is not locked but no
+ * console printing may occur.
+ *
+ * Note: This returns true when the console is suspended but is not locked.
+ *       This is intentional because the VT code must consider that situation
+ *       the same as if the console was locked.
+ */
 int is_console_locked(void)
 {
-	return console_locked;
+	return (console_kthreads_blocked || atomic_read(&console_kthreads_active));
 }
 EXPORT_SYMBOL(is_console_locked);
 
@@ -2620,18 +2780,9 @@ static bool abandon_console_lock_in_pani
 	return atomic_read(&panic_cpu) != raw_smp_processor_id();
 }
 
-/*
- * Check if the given console is currently capable and allowed to print
- * records.
- *
- * Requires the console_lock.
- */
-static inline bool console_is_usable(struct console *con)
+static inline bool __console_is_usable(short flags)
 {
-	if (!(con->flags & CON_ENABLED))
-		return false;
-
-	if (!con->write)
+	if (!(flags & CON_ENABLED))
 		return false;
 
 	/*
@@ -2640,15 +2791,43 @@ static inline bool console_is_usable(str
 	 * cope (CON_ANYTIME) don't call them until this CPU is officially up.
 	 */
 	if (!cpu_online(raw_smp_processor_id()) &&
-	    !(con->flags & CON_ANYTIME))
+	    !(flags & CON_ANYTIME))
 		return false;
 
 	return true;
 }
 
+/*
+ * Check if the given console is currently capable and allowed to print
+ * records.
+ *
+ * Requires holding the console_lock.
+ */
+static inline bool console_is_usable(struct console *con)
+{
+	if (!con->write)
+		return false;
+
+	return __console_is_usable(con->flags);
+}
+
 static void __console_unlock(void)
 {
-	console_locked = 0;
+	/*
+	 * Depending on whether console_lock() or console_trylock() was used,
+	 * appropriately allow the kthread printers to continue.
+	 */
+	if (console_kthreads_blocked)
+		console_kthreads_unblock();
+	else
+		console_kthreads_atomic_unblock();
+
+	/*
+	 * New records may have arrived while the console was locked.
+	 * Wake the kthread printers to print them.
+	 */
+	wake_up_klogd();
+
 	up_console_sem();
 }
 
@@ -2666,17 +2845,19 @@ static void __console_unlock(void)
  *
  * @handover will be set to true if a printk waiter has taken over the
  * console_lock, in which case the caller is no longer holding the
- * console_lock. Otherwise it is set to false.
+ * console_lock. Otherwise it is set to false. A NULL pointer may be provided
+ * to disable allowing the console_lock to be taken over by a printk waiter.
  *
  * Returns false if the given console has no next record to print, otherwise
  * true.
  *
- * Requires the console_lock.
+ * Requires the console_lock if @handover is non-NULL.
+ * Requires con->lock otherwise.
  */
-static bool console_emit_next_record(struct console *con, char *text, char *ext_text,
-				     char *dropped_text, bool *handover)
+static bool __console_emit_next_record(struct console *con, char *text, char *ext_text,
+				       char *dropped_text, bool *handover)
 {
-	static int panic_console_dropped;
+	static atomic_t panic_console_dropped = ATOMIC_INIT(0);
 	struct printk_info info;
 	struct printk_record r;
 	unsigned long flags;
@@ -2685,7 +2866,8 @@ static bool console_emit_next_record(str
 
 	prb_rec_init_rd(&r, &info, text, CONSOLE_LOG_MAX);
 
-	*handover = false;
+	if (handover)
+		*handover = false;
 
 	if (!prb_read_valid(prb, con->seq, &r))
 		return false;
@@ -2693,7 +2875,8 @@ static bool console_emit_next_record(str
 	if (con->seq != r.info->seq) {
 		con->dropped += r.info->seq - con->seq;
 		con->seq = r.info->seq;
-		if (panic_in_progress() && panic_console_dropped++ > 10) {
+		if (panic_in_progress() &&
+		    atomic_fetch_inc_relaxed(&panic_console_dropped) > 10) {
 			suppress_panic_printk = 1;
 			pr_warn_once("Too many dropped messages. Suppress messages on non-panic CPUs to prevent livelock.\n");
 		}
@@ -2715,32 +2898,62 @@ static bool console_emit_next_record(str
 		len = record_print_text(&r, console_msg_format & MSG_FORMAT_SYSLOG, printk_time);
 	}
 
-	/*
-	 * While actively printing out messages, if another printk()
-	 * were to occur on another CPU, it may wait for this one to
-	 * finish. This task can not be preempted if there is a
-	 * waiter waiting to take over.
-	 *
-	 * Interrupts are disabled because the hand over to a waiter
-	 * must not be interrupted until the hand over is completed
-	 * (@console_waiter is cleared).
-	 */
-	printk_safe_enter_irqsave(flags);
-	console_lock_spinning_enable();
+	if (handover) {
+		/*
+		 * While actively printing out messages, if another printk()
+		 * were to occur on another CPU, it may wait for this one to
+		 * finish. This task can not be preempted if there is a
+		 * waiter waiting to take over.
+		 *
+		 * Interrupts are disabled because the hand over to a waiter
+		 * must not be interrupted until the hand over is completed
+		 * (@console_waiter is cleared).
+		 */
+		printk_safe_enter_irqsave(flags);
+		console_lock_spinning_enable();
+
+		/* don't trace irqsoff print latency */
+		stop_critical_timings();
+	}
 
-	stop_critical_timings();	/* don't trace print latency */
 	call_console_driver(con, write_text, len, dropped_text);
-	start_critical_timings();
 
 	con->seq++;
 
-	*handover = console_lock_spinning_disable_and_check();
-	printk_safe_exit_irqrestore(flags);
+	if (handover) {
+		start_critical_timings();
+		*handover = console_lock_spinning_disable_and_check();
+		printk_safe_exit_irqrestore(flags);
+	}
 skip:
 	return true;
 }
 
 /*
+ * Print a record for a given console, but allow another printk() caller to
+ * take over the console_lock and continue printing.
+ *
+ * Requires the console_lock, but depending on @handover after the call, the
+ * caller may no longer have the console_lock.
+ *
+ * See __console_emit_next_record() for argument and return details.
+ */
+static bool console_emit_next_record_transferable(struct console *con, char *text, char *ext_text,
+						  char *dropped_text, bool *handover)
+{
+	/*
+	 * Handovers are only supported if threaded printers are atomically
+	 * blocked. The context taking over the console_lock may be atomic.
+	 */
+	if (!console_kthreads_atomically_blocked()) {
+		*handover = false;
+		handover = NULL;
+	}
+
+	return __console_emit_next_record(con, text, ext_text, dropped_text, handover);
+}
+
+/*
  * Print out all remaining records to all consoles.
  *
  * @do_cond_resched is set by the caller. It can be true only in schedulable
@@ -2758,8 +2971,8 @@ static bool console_emit_next_record(str
  * were flushed to all usable consoles. A returned false informs the caller
  * that everything was not flushed (either there were no usable consoles or
  * another context has taken over printing or it is a panic situation and this
- * is not the panic CPU). Regardless the reason, the caller should assume it
- * is not useful to immediately try again.
+ * is not the panic CPU or direct printing is not preferred). Regardless the
+ * reason, the caller should assume it is not useful to immediately try again.
  *
  * Requires the console_lock.
  */
@@ -2776,6 +2989,10 @@ static bool console_flush_all(bool do_co
 	*handover = false;
 
 	do {
+		/* Let the kthread printers do the work if they can. */
+		if (!allow_direct_printing())
+			return false;
+
 		any_progress = false;
 
 		for_each_console(con) {
@@ -2787,13 +3004,11 @@ static bool console_flush_all(bool do_co
 
 			if (con->flags & CON_EXTENDED) {
 				/* Extended consoles do not print "dropped messages". */
-				progress = console_emit_next_record(con, &text[0],
-								    &ext_text[0], NULL,
-								    handover);
+				progress = console_emit_next_record_transferable(con, &text[0],
+								&ext_text[0], NULL, handover);
 			} else {
-				progress = console_emit_next_record(con, &text[0],
-								    NULL, &dropped_text[0],
-								    handover);
+				progress = console_emit_next_record_transferable(con, &text[0],
+								NULL, &dropped_text[0], handover);
 			}
 			if (*handover)
 				return false;
@@ -2908,10 +3123,13 @@ void console_unblank(void)
 	if (oops_in_progress) {
 		if (down_trylock_console_sem() != 0)
 			return;
+		if (!console_kthreads_atomic_tryblock()) {
+			up_console_sem();
+			return;
+		}
 	} else
 		console_lock();
 
-	console_locked = 1;
 	console_may_schedule = 0;
 	for_each_console(c)
 		if ((c->flags & CON_ENABLED) && c->unblank)
@@ -3190,6 +3408,10 @@ void register_console(struct console *ne
 		nr_ext_console_drivers++;
 
 	newcon->dropped = 0;
+	newcon->thread = NULL;
+	newcon->blocked = true;
+	mutex_init(&newcon->lock);
+
 	if (newcon->flags & CON_PRINTBUFFER) {
 		/* Get a consistent copy of @syslog_seq. */
 		mutex_lock(&syslog_lock);
@@ -3199,6 +3421,10 @@ void register_console(struct console *ne
 		/* Begin with next message. */
 		newcon->seq = prb_next_seq(prb);
 	}
+
+	if (printk_kthreads_available)
+		printk_start_kthread(newcon);
+
 	console_unlock();
 	console_sysfs_notify();
 
@@ -3225,6 +3451,7 @@ EXPORT_SYMBOL(register_console);
 
 int unregister_console(struct console *console)
 {
+	struct task_struct *thd;
 	struct console *con;
 	int res;
 
@@ -3265,7 +3492,20 @@ int unregister_console(struct console *c
 		console_drivers->flags |= CON_CONSDEV;
 
 	console->flags &= ~CON_ENABLED;
+
+	/*
+	 * console->thread can only be cleared under the console lock. But
+	 * stopping the thread must be done without the console lock. The
+	 * task that clears @thread is the task that stops the kthread.
+	 */
+	thd = console->thread;
+	console->thread = NULL;
+
 	console_unlock();
+
+	if (thd)
+		kthread_stop(thd);
+
 	console_sysfs_notify();
 
 	if (console->exit)
@@ -3361,6 +3601,20 @@ static int __init printk_late_init(void)
 }
 late_initcall(printk_late_init);
 
+static int __init printk_activate_kthreads(void)
+{
+	struct console *con;
+
+	console_lock();
+	printk_kthreads_available = true;
+	for_each_console(con)
+		printk_start_kthread(con);
+	console_unlock();
+
+	return 0;
+}
+early_initcall(printk_activate_kthreads);
+
 #if defined CONFIG_PRINTK
 /* If @con is specified, only wait for that console. Otherwise wait for all. */
 static bool __pr_flush(struct console *con, int timeout_ms, bool reset_on_progress)
@@ -3444,11 +3698,209 @@ bool pr_flush(int timeout_ms, bool reset
 }
 EXPORT_SYMBOL(pr_flush);
 
+static void __printk_fallback_preferred_direct(void)
+{
+	printk_prefer_direct_enter();
+	pr_err("falling back to preferred direct printing\n");
+	printk_kthreads_available = false;
+}
+
+/*
+ * Enter preferred direct printing, but never exit. Mark console threads as
+ * unavailable. The system is then forever in preferred direct printing and
+ * any printing threads will exit.
+ *
+ * Must *not* be called under console_lock. Use
+ * __printk_fallback_preferred_direct() if already holding console_lock.
+ */
+static void printk_fallback_preferred_direct(void)
+{
+	console_lock();
+	__printk_fallback_preferred_direct();
+	console_unlock();
+}
+
+/*
+ * Print a record for a given console, not allowing another printk() caller
+ * to take over. This is appropriate for contexts that do not have the
+ * console_lock.
+ *
+ * See __console_emit_next_record() for argument and return details.
+ */
+static bool console_emit_next_record(struct console *con, char *text, char *ext_text,
+				     char *dropped_text)
+{
+	return __console_emit_next_record(con, text, ext_text, dropped_text, NULL);
+}
+
+static bool printer_should_wake(struct console *con, u64 seq)
+{
+	short flags;
+
+	if (kthread_should_stop() || !printk_kthreads_available)
+		return true;
+
+	if (con->blocked ||
+	    console_kthreads_atomically_blocked() ||
+	    block_console_kthreads ||
+	    system_state > SYSTEM_RUNNING ||
+	    oops_in_progress) {
+		return false;
+	}
+
+	/*
+	 * This is an unsafe read from con->flags, but a false positive is
+	 * not a problem. Worst case it would allow the printer to wake up
+	 * although it is disabled. But the printer will notice that when
+	 * attempting to print and instead go back to sleep.
+	 */
+	flags = data_race(READ_ONCE(con->flags));
+
+	if (!__console_is_usable(flags))
+		return false;
+
+	return prb_read_valid(prb, seq, NULL);
+}
+
+static int printk_kthread_func(void *data)
+{
+	struct console *con = data;
+	char *dropped_text = NULL;
+	char *ext_text = NULL;
+	u64 seq = 0;
+	char *text;
+	int error;
+
+	text = kmalloc(CONSOLE_LOG_MAX, GFP_KERNEL);
+	if (!text) {
+		con_printk(KERN_ERR, con, "failed to allocate text buffer\n");
+		printk_fallback_preferred_direct();
+		goto out;
+	}
+
+	if (con->flags & CON_EXTENDED) {
+		ext_text = kmalloc(CONSOLE_EXT_LOG_MAX, GFP_KERNEL);
+		if (!ext_text) {
+			con_printk(KERN_ERR, con, "failed to allocate ext_text buffer\n");
+			printk_fallback_preferred_direct();
+			goto out;
+		}
+	} else {
+		dropped_text = kmalloc(DROPPED_TEXT_MAX, GFP_KERNEL);
+		if (!dropped_text) {
+			con_printk(KERN_ERR, con, "failed to allocate dropped_text buffer\n");
+			printk_fallback_preferred_direct();
+			goto out;
+		}
+	}
+
+	con_printk(KERN_INFO, con, "printing thread started\n");
+
+	for (;;) {
+		/*
+		 * Guarantee this task is visible on the waitqueue before
+		 * checking the wake condition.
+		 *
+		 * The full memory barrier within set_current_state() of
+		 * prepare_to_wait_event() pairs with the full memory barrier
+		 * within wq_has_sleeper().
+		 *
+		 * This pairs with __wake_up_klogd:A.
+		 */
+		error = wait_event_interruptible(log_wait,
+				printer_should_wake(con, seq)); /* LMM(printk_kthread_func:A) */
+
+		if (kthread_should_stop() || !printk_kthreads_available)
+			break;
+
+		if (error)
+			continue;
+
+		error = mutex_lock_interruptible(&con->lock);
+		if (error)
+			continue;
+
+		if (con->blocked ||
+		    !console_kthread_printing_tryenter()) {
+			/* Another context has locked the console_lock. */
+			mutex_unlock(&con->lock);
+			continue;
+		}
+
+		/*
+		 * Although this context has not locked the console_lock, it
+		 * is known that the console_lock is not locked and it is not
+		 * possible for any other context to lock the console_lock.
+		 * Therefore it is safe to read con->flags.
+		 */
+
+		if (!__console_is_usable(con->flags)) {
+			console_kthread_printing_exit();
+			mutex_unlock(&con->lock);
+			continue;
+		}
+
+		/*
+		 * Even though the printk kthread is always preemptible, it is
+		 * still not allowed to call cond_resched() from within
+		 * console drivers. The task may become non-preemptible in the
+		 * console driver call chain. For example, vt_console_print()
+		 * takes a spinlock and then can call into fbcon_redraw(),
+		 * which can conditionally invoke cond_resched().
+		 */
+		console_may_schedule = 0;
+		console_emit_next_record(con, text, ext_text, dropped_text);
+
+		seq = con->seq;
+
+		console_kthread_printing_exit();
+
+		mutex_unlock(&con->lock);
+	}
+
+	con_printk(KERN_INFO, con, "printing thread stopped\n");
+out:
+	kfree(dropped_text);
+	kfree(ext_text);
+	kfree(text);
+
+	console_lock();
+	/*
+	 * If this kthread is being stopped by another task, con->thread will
+	 * already be NULL. That is fine. The important thing is that it is
+	 * NULL after the kthread exits.
+	 */
+	con->thread = NULL;
+	console_unlock();
+
+	return 0;
+}
+
+/* Must be called under console_lock. */
+static void printk_start_kthread(struct console *con)
+{
+	/*
+	 * Do not start a kthread if there is no write() callback. The
+	 * kthreads assume the write() callback exists.
+	 */
+	if (!con->write)
+		return;
+
+	con->thread = kthread_run(printk_kthread_func, con,
+				  "pr/%s%d", con->name, con->index);
+	if (IS_ERR(con->thread)) {
+		con->thread = NULL;
+		con_printk(KERN_ERR, con, "unable to start printing thread\n");
+		__printk_fallback_preferred_direct();
+		return;
+	}
+}
+
 /*
  * Delayed printk version, for scheduler-internal messages:
  */
-#define PRINTK_PENDING_WAKEUP	0x01
-#define PRINTK_PENDING_OUTPUT	0x02
+#define PRINTK_PENDING_WAKEUP		0x01
+#define PRINTK_PENDING_DIRECT_OUTPUT	0x02
 
 static DEFINE_PER_CPU(int, printk_pending);
 
@@ -3456,10 +3908,14 @@ static void wake_up_klogd_work_func(stru
 {
 	int pending = this_cpu_xchg(printk_pending, 0);
 
-	if (pending & PRINTK_PENDING_OUTPUT) {
+	if (pending & PRINTK_PENDING_DIRECT_OUTPUT) {
+		printk_prefer_direct_enter();
+
 		/* If trylock fails, someone else is doing the printing */
 		if (console_trylock())
 			console_unlock();
+
+		printk_prefer_direct_exit();
 	}
 
 	if (pending & PRINTK_PENDING_WAKEUP)
@@ -3484,10 +3940,11 @@ static void __wake_up_klogd(int val)
 	 * prepare_to_wait_event(), which is called after ___wait_event() adds
 	 * the waiter but before it has checked the wait condition.
 	 *
-	 * This pairs with devkmsg_read:A and syslog_print:A.
+	 * This pairs with devkmsg_read:A, syslog_print:A, and
+	 * printk_kthread_func:A.
 	 */
 	if (wq_has_sleeper(&log_wait) || /* LMM(__wake_up_klogd:A) */
-	    (val & PRINTK_PENDING_OUTPUT)) {
+	    (val & PRINTK_PENDING_DIRECT_OUTPUT)) {
 		this_cpu_or(printk_pending, val);
 		irq_work_queue(this_cpu_ptr(&wake_up_klogd_work));
 	}
@@ -3505,7 +3962,17 @@ void defer_console_output(void)
 	 * New messages may have been added directly to the ringbuffer
 	 * using vprintk_store(), so wake any waiters as well.
 	 */
-	__wake_up_klogd(PRINTK_PENDING_WAKEUP | PRINTK_PENDING_OUTPUT);
+	int val = PRINTK_PENDING_WAKEUP;
+
+	/*
+	 * Make sure that some context will print the messages when direct
+	 * printing is allowed. This happens in situations when the kthreads
+	 * may not be as reliable or perhaps unusable.
+	 */
+	if (allow_direct_printing())
+		val |= PRINTK_PENDING_DIRECT_OUTPUT;
+
+	__wake_up_klogd(val);
 }
 
 void printk_trigger_flush(void)
--- a/kernel/printk/printk_safe.c
+++ b/kernel/printk/printk_safe.c
@@ -8,7 +8,9 @@
 #include <linux/smp.h>
 #include <linux/cpumask.h>
 #include <linux/printk.h>
+#include <linux/console.h>
 #include <linux/kprobes.h>
+#include <linux/delay.h>
 
 #include "internal.h"
 
@@ -50,3 +52,33 @@ asmlinkage int vprintk(const char *fmt,
 	return vprintk_default(fmt, args);
 }
 EXPORT_SYMBOL(vprintk);
+
+/**
+ * try_block_console_kthreads() - Try to block console kthreads and
+ *	make the global console_lock() avaialble
+ *
+ * @timeout_ms:        The maximum time (in ms) to wait.
+ *
+ * Prevent console kthreads from starting processing new messages. Wait
+ * until the global console_lock() become available.
+ *
+ * Context: Can be called in any context.
+ */
+void try_block_console_kthreads(int timeout_ms)
+{
+	block_console_kthreads = true;
+
+	/* Do not wait when the console lock could not be safely taken. */
+	if (this_cpu_read(printk_context) || in_nmi())
+		return;
+
+	while (timeout_ms > 0) {
+		if (console_trylock()) {
+			console_unlock();
+			return;
+		}
+
+		udelay(1000);
+		timeout_ms -= 1;
+	}
+}
--- a/kernel/rcu/tree_stall.h
+++ b/kernel/rcu/tree_stall.h
@@ -643,6 +643,7 @@ static void print_cpu_stall(unsigned lon
 	 * See Documentation/RCU/stallwarn.rst for info on how to debug
 	 * RCU CPU stall warnings.
 	 */
+	printk_prefer_direct_enter();
 	trace_rcu_stall_warning(rcu_state.name, TPS("SelfDetected"));
 	pr_err("INFO: %s self-detected stall on CPU\n", rcu_state.name);
 	raw_spin_lock_irqsave_rcu_node(rdp->mynode, flags);
@@ -677,6 +678,7 @@ static void print_cpu_stall(unsigned lon
 	 */
 	set_tsk_need_resched(current);
 	set_preempt_need_resched();
+	printk_prefer_direct_exit();
 }
 
 static void check_cpu_stall(struct rcu_data *rdp)
--- a/kernel/reboot.c
+++ b/kernel/reboot.c
@@ -82,6 +82,7 @@ void kernel_restart_prepare(char *cmd)
 {
 	blocking_notifier_call_chain(&reboot_notifier_list, SYS_RESTART, cmd);
 	system_state = SYSTEM_RESTART;
+	try_block_console_kthreads(10000);
 	usermodehelper_disable();
 	device_shutdown();
 }
@@ -270,6 +271,7 @@ static void kernel_shutdown_prepare(enum
 	blocking_notifier_call_chain(&reboot_notifier_list,
 		(state == SYSTEM_HALT) ? SYS_HALT : SYS_POWER_OFF, NULL);
 	system_state = state;
+	try_block_console_kthreads(10000);
 	usermodehelper_disable();
 	device_shutdown();
 }
@@ -819,9 +821,11 @@ static int __orderly_reboot(void)
 	ret = run_cmd(reboot_cmd);
 
 	if (ret) {
+		printk_prefer_direct_enter();
 		pr_warn("Failed to start orderly reboot: forcing the issue\n");
 		emergency_sync();
 		kernel_restart(NULL);
+		printk_prefer_direct_exit();
 	}
 
 	return ret;
@@ -834,6 +838,7 @@ static int __orderly_poweroff(bool force
 	ret = run_cmd(poweroff_cmd);
 
 	if (ret && force) {
+		printk_prefer_direct_enter();
 		pr_warn("Failed to start orderly shutdown: forcing the issue\n");
 
 		/*
@@ -843,6 +848,7 @@ static int __orderly_poweroff(bool force
 		 */
 		emergency_sync();
 		kernel_power_off();
+		printk_prefer_direct_exit();
 	}
 
 	return ret;
@@ -900,6 +906,8 @@ EXPORT_SYMBOL_GPL(orderly_reboot);
  */
 static void hw_failure_emergency_poweroff_func(struct work_struct *work)
 {
+	printk_prefer_direct_enter();
+
 	/*
 	 * We have reached here after the emergency shutdown waiting period has
 	 * expired. This means orderly_poweroff has not been able to shut off
@@ -916,6 +924,8 @@ static void hw_failure_emergency_powerof
 	 */
 	pr_emerg("Hardware protection shutdown failed. Trying emergency restart\n");
 	emergency_restart();
+
+	printk_prefer_direct_exit();
 }
 
 static DECLARE_DELAYED_WORK(hw_failure_emergency_poweroff_work,
@@ -954,11 +964,13 @@ void hw_protection_shutdown(const char *
 {
 	static atomic_t allow_proceed = ATOMIC_INIT(1);
 
+	printk_prefer_direct_enter();
+
 	pr_emerg("HARDWARE PROTECTION shutdown (%s)\n", reason);
 
 	/* Shutdown should be initiated only once. */
 	if (!atomic_dec_and_test(&allow_proceed))
-		return;
+		goto out;
 
 	/*
 	 * Queue a backup emergency shutdown in the event of
@@ -966,6 +978,8 @@ void hw_protection_shutdown(const char *
 	 */
 	hw_failure_emergency_poweroff(ms_until_forced);
 	orderly_poweroff(true);
+out:
+	printk_prefer_direct_exit();
 }
 EXPORT_SYMBOL_GPL(hw_protection_shutdown);
 
--- a/kernel/watchdog.c
+++ b/kernel/watchdog.c
@@ -424,6 +424,8 @@ static enum hrtimer_restart watchdog_tim
 		/* Start period for the next softlockup warning. */
 		update_report_ts();
 
+		printk_prefer_direct_enter();
+
 		pr_emerg("BUG: soft lockup - CPU#%d stuck for %us! [%s:%d]\n",
 			smp_processor_id(), duration,
 			current->comm, task_pid_nr(current));
@@ -442,6 +444,8 @@ static enum hrtimer_restart watchdog_tim
 		add_taint(TAINT_SOFTLOCKUP, LOCKDEP_STILL_OK);
 		if (softlockup_panic)
 			panic("softlockup: hung tasks");
+
+		printk_prefer_direct_exit();
 	}
 
 	return HRTIMER_RESTART;
--- a/kernel/watchdog_hld.c
+++ b/kernel/watchdog_hld.c
@@ -135,6 +135,8 @@ static void watchdog_overflow_callback(s
 		if (__this_cpu_read(hard_watchdog_warn) == true)
 			return;
 
+		printk_prefer_direct_enter();
+
 		pr_emerg("Watchdog detected hard LOCKUP on cpu %d\n",
 			 this_cpu);
 		print_modules();
@@ -155,6 +157,8 @@ static void watchdog_overflow_callback(s
 		if (hardlockup_panic)
 			nmi_panic(regs, "Hard LOCKUP");
 
+		printk_prefer_direct_exit();
+
 		__this_cpu_write(hard_watchdog_warn, true);
 		return;
 	}