New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't spawn a goroutine for every event recording #95664
Don't spawn a goroutine for every event recording #95664
Conversation
Additional points: I introduced a new method with a longer queue size in an attempt to be minimally disruptive to the other, much more important use of this code: watch broadcasting. The longer queue size is to avoid dropping events on the floor when possible, and brings the incoming queue in line with the outgoing queues, which already had the "drop events on the floor if needed" behavior. |
/assign @yliaog |
799a7b4
to
7a96e6a
Compare
@@ -198,6 +214,18 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) { | |||
m.incoming <- Event{action, obj} | |||
} | |||
|
|||
// Action distributes the given event among all watchers, or drops it on the floor | |||
// if too many incoming actions are queue up. Returns true if the action was sent, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/queue/queued/
func NewLongQueueBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster { | ||
m := &Broadcaster{ | ||
watchers: map[int64]*broadcasterWatcher{}, | ||
incoming: make(chan Event, queueLength), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so effectively, the incoming queue length is increased from 25 to 1000, any idea why it was only 25 in the first place? the comment below indicated it should rarely happen, did you see the events dropped due to incoming queue length?
// Buffer the incoming queue a little bit even though it should rarely ever accumulate
// anything, just in case a few events are received in such a short window that
// Broadcaster can't move them onto the watchers' queues fast enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we're no longer blocking, I figured I'd extend it to match the outgoing queue, which currently already has a "drop on full behavior". As much as possible, I wanted to avoid "breaking" something in k/k itself, so I figured matching the outgoing queues was a safe bet.
// NOTE: events should be a non-blocking operation, but we also need to not | ||
// put this in a goroutine, otherwise we'll race to write to a closed channel | ||
// when we go to shut down this broadcaster. Just drop events if we get overloaded, | ||
// and log an error us if that happens (we've configured the broadcaster to drop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/log an error us if/log an error if/
@@ -101,6 +102,29 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event) | |||
} | |||
} | |||
|
|||
func TestNonRacyShutdown(t *testing.T) { | |||
// Attempt to simulate previously racy conditions, and ensure that no race |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the test by itself does not assert no race condition. it would rely on the test being run with race detector enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, that's not true -- the "race" is pretty easily detected as a write to a closed channel, which works even w/o the race detector (it just panics)
// and log an error us if that happens (we've configured the broadcaster to drop | ||
// outgoing events anyway). | ||
if sent := recorder.ActionOrDrop(watch.Added, event); !sent { | ||
klog.Errorf("unable to record event: too many queued events, dropped event %#v", event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is dropping event an error? could it possibly spam logs when too many events are dropped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, we log when we drop from the other end. It's unlikely that this happens, you probably want to know when it does, and we log events anyway.
/lgtm |
/retest |
/assign @caesarxuchao (for approval) |
LGTM. Can you fix the typos Yu pointed out? |
This changes the event recorder to use the equivalent of a select statement instead of a goroutine to record events. Previously, we used a goroutine to make event recording non-blocking. Unfortunately, this writes to a channel, and during shutdown we then race to write to a closed channel, panicing (caught by the error handler, but still) and making the race detector unhappy. Instead, we now use the select statement to make event emitting non-blocking, and if we'd block, we just drop the event. We already drop events if a particular sink is overloaded, so this just moves the incoming event queue to match that behavior (and makes the incoming event queue much longer). This means that, if the user uses `Eventf` and friends correctly (i.e. ensure they've returned by the time we call `Shutdown`), it's now safe to call Shutdown. This matches the conventional go guidance on channels: the writer should call close.
7a96e6a
to
e90e67b
Compare
/lgtm |
/lgtm |
@caesarxuchao still needs approved label, looks like |
/approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: caesarxuchao, DirectXMan12 The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/retest Review the full test history for this PR. Silence the bot with an |
1 similar comment
/retest Review the full test history for this PR. Silence the bot with an |
Can this change be backported to 1.20? |
This changes the event recorder to use the equivalent of a select
statement instead of a goroutine to record events.
Previously, we used a goroutine to make event recording non-blocking.
Unfortunately, this writes to a channel, and during shutdown we then
race to write to a closed channel, panicing (caught by the error
handler, but still) and making the race detector unhappy.
Instead, we now use the select statement to make event emitting
non-blocking, and if we'd block, we just drop the event. We already
drop events if a particular sink is overloaded, so this just moves the
incoming event queue to match that behavior (and makes the incoming
event queue much longer).
This means that, if the user uses
Eventf
and friends correctly (i.e.ensure they've returned by the time they call
Shutdown
), it'snow safe to call
Shutdown
. This matches the conventional go guidance onchannels: the writing goroutine should call close.
Fixes #94906
/kind bug
/sig api-machinery