Queue.h
Go to the documentation of this file.
122 template<class Value> void statIn(std::ostream &, int localProcessId, int remoteProcessId) const;
124 template<class Value> void statOut(std::ostream &, int localProcessId, int remoteProcessId) const;
129 template<class Value> void statSamples(std::ostream &, unsigned int start, uint32_t size) const;
148 OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity);
151 static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity);
251 Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset);
253 static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); }
265 Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
274 static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
296 int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
297 const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
337 Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
346 static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
436OneToOneUniQueue::statIn(std::ostream &os, const int localProcessId, const int remoteProcessId) const
451OneToOneUniQueue::statOut(std::ostream &os, const int localProcessId, const int remoteProcessId) const
468OneToOneUniQueue::statSamples(std::ostream &os, const unsigned int start, const uint32_t count) const
549 debugs(54, 7, "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
562 debugs(54, 7, "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
588 for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
595 for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
Definition: InstanceId.h:47
Definition: Queue.h:168
virtual int remotesCount() const =0
const QueueReader::Rate & rateLimit(const int remoteProcessId) const
returns reader's rate limit for a given remote process
Definition: Queue.cc:187
void clearAllReaderSignals()
clears all reader notifications received by the local process
Definition: Queue.cc:172
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const =0
incoming queue from a given remote process
virtual int remotesIdOffset() const =0
bool peek(int &remoteProcessId, Value &value) const
peeks at the item likely to be pop()ed next
Definition: Queue.h:568
virtual const QueueReader & localReader() const =0
int inSize(const int remoteProcessId) const
number of items in incoming queue from a given remote process
Definition: Queue.h:204
void stat(std::ostream &) const
prints current state; suitable for cache manager reports
Definition: Queue.h:586
virtual const QueueReader & remoteReader(const int remoteProcessId) const =0
void clearReaderSignal(const int remoteProcessId)
clears the reader notification received by the local process from the remote process
Definition: Queue.cc:159
bool push(const int remoteProcessId, const Value &value)
calls OneToOneUniQueue::push() using the given process queue
Definition: Queue.h:558
BaseMultiQueue(const int aLocalProcessId)
Definition: Queue.cc:152
bool pop(int &remoteProcessId, Value &value)
picks a process and calls OneToOneUniQueue::pop() using its queue
Definition: Queue.h:540
int outSize(const int remoteProcessId) const
number of items in outgoing queue to a given remote process
Definition: Queue.h:207
const QueueReader::Balance & balance(const int remoteProcessId) const
returns reader's balance for a given remote process
Definition: Queue.cc:180
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const =0
outgoing queue to a given remote process
Definition: Queue.h:263
Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:351
Mem::Owner< OneToOneUniQueues > *const queuesOwner
Definition: Queue.h:270
Mem::Owner< QueueReaders > *const readersOwner
Definition: Queue.h:271
Mem::Owner< Metadata > *const metadataOwner
Definition: Queue.h:269
Definition: Queue.h:243
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:267
const OneToOneUniQueue & outQueue(const int remoteProcessId) const override
outgoing queue to a given remote process
Definition: Queue.cc:302
const OneToOneUniQueue & oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:289
OneToOneUniQueue::ItemTooLarge ItemTooLarge
Definition: Queue.h:246
const QueueReader & remoteReader(const int processId) const override
Definition: Queue.cc:324
const QueueReader & localReader() const override
Definition: Queue.cc:318
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
Definition: Queue.cc:247
int readerIndex(const Group group, const int processId) const
Definition: Queue.cc:309
bool validProcessId(const Group group, const int processId) const
Definition: Queue.cc:253
bool findOldest(const int remoteProcessId, Value &value) const
Definition: Queue.h:611
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:228
const OneToOneUniQueue & inQueue(const int remoteProcessId) const override
incoming queue from a given remote process
Definition: Queue.cc:295
FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId)
Definition: Queue.cc:233
Definition: FlexibleArray.h:27
Definition: Pointer.h:26
Definition: Pointer.h:83
Definition: Queue.h:335
Mem::Owner< OneToOneUniQueues > *const queuesOwner
Definition: Queue.h:342
Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:453
Mem::Owner< QueueReaders > *const readersOwner
Definition: Queue.h:343
Mem::Owner< Metadata > *const metadataOwner
Definition: Queue.h:341
Definition: Queue.h:317
const QueueReader & reader(const int processId) const
Definition: Queue.cc:404
static Owner * Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:368
bool validProcessId(const int processId) const
Definition: Queue.cc:386
const QueueReader & localReader() const override
Definition: Queue.cc:424
const OneToOneUniQueue & outQueue(const int remoteProcessId) const override
outgoing queue to a given remote process
Definition: Queue.cc:418
MultiQueue(const String &id, const int localProcessId)
Definition: Queue.cc:373
const QueueReader & remoteReader(const int remoteProcessId) const override
Definition: Queue.cc:430
const OneToOneUniQueue & oneToOneQueue(const int fromProcessId, const int toProcessId) const
Definition: Queue.cc:393
const OneToOneUniQueue & inQueue(const int remoteProcessId) const override
incoming queue from a given remote process
Definition: Queue.cc:412
Definition: Queue.h:96
Definition: Queue.h:97
Definition: Queue.h:93
unsigned int theIn
current push() position; reporting aside, used only in push()
Definition: Queue.h:134
bool peek(Value &value) const
returns true iff the value was set; the value may be stale!
Definition: Queue.h:403
static int Items2Bytes(const unsigned int maxItemSize, const int size)
Definition: Queue.cc:92
bool push(const Value &value, QueueReader *const reader=nullptr)
returns true iff the caller must notify the reader of the pushed item
Definition: Queue.h:419
void statClose(std::ostream &) const
end state reporting started by statOpen()
Definition: Queue.cc:112
OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity)
Definition: Queue.cc:75
static int Bytes2Items(const unsigned int maxItemSize, int size)
Definition: Queue.cc:84
void statIn(std::ostream &, int localProcessId, int remoteProcessId) const
prints incoming queue state; suitable for cache manager reports
Definition: Queue.h:436
bool pop(Value &value, QueueReader *const reader=nullptr)
returns true iff the value was set; [un]blocks the reader as needed
Definition: Queue.h:373
void statOut(std::ostream &, int localProcessId, int remoteProcessId) const
prints outgoing queue state; suitable for cache manager reports
Definition: Queue.h:451
void statSamples(std::ostream &, unsigned int start, uint32_t size) const
report a sample of [start, start + size) items
Definition: Queue.h:468
void statRange(std::ostream &, unsigned int start, uint32_t n) const
statSamples() helper that reports n items from start
Definition: Queue.h:503
void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const
Definition: Queue.cc:102
unsigned int theOut
current pop() position; reporting aside, used only in pop()/peek()
Definition: Queue.h:135
OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:119
const OneToOneUniQueue & operator[](const int index) const
Definition: Queue.cc:141
static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:133
Definition: Queue.h:29
const InstanceId< QueueReader > id
unique ID for debugging which reader is used (works across processes)
Definition: Queue.h:67
std::atomic< bool > popSignal
whether writer has sent and reader has not received notification
Definition: Queue.h:54
Balance balance
how far ahead the reader is compared to a perfect read/sec event rate
Definition: Queue.h:64
void clearSignal()
marks sent reader notification as received (also removes pop blocking)
Definition: Queue.h:50
bool signaled() const
whether writer has sent and reader has not received notification
Definition: Queue.h:37
static size_t SharedMemorySize(const int capacity)
Definition: Queue.cc:68
Definition: SquidString.h:26
Definition: IpcIoFile.h:24
size_t sharedMemorySize() const
Definition: Queue.h:252
Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset)
Definition: Queue.cc:343
static size_t SharedMemorySize(const int, const int, const int, const int)
Definition: Queue.h:253
Metadata(const int aProcessCount, const int aProcessIdOffset)
Definition: Queue.cc:447
static size_t SharedMemorySize(const int, const int)
Definition: Queue.h:327