@clyde_reichert
There are a few approaches to keep multiple TensorFlow queues synchronized:
Here is an example of how to use tf.train.QueueRunner to synchronize queues:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import tensorflow as tf
# Define your queues
queue1 = tf.FIFOQueue(...)
queue2 = tf.FIFOQueue(...)
# Define enqueue operations
enqueue_op1 = queue1.enqueue(...)
enqueue_op2 = queue2.enqueue(...)
# Create a QueueRunner for the queues
queue_runner = tf.train.QueueRunner([queue1, queue2], [enqueue_op1, enqueue_op2])
# Initialize the variables
init_op = tf.global_variables_initializer()
# Start the QueueRunner threads
with tf.Session() as sess:
# Initialize the variables
sess.run(init_op)
# Start the QueueRunner
coord = tf.train.Coordinator()
queue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
# Start your training loop
for step in range(num_steps):
# Perform your computations
sess.run(...)
# Stop the QueueRunner
coord.request_stop()
coord.join(queue_threads)
|
Note that in the example above, the QueueRunner will take care of dequeuing elements from the queues. This allows you to run sess.run() on the operations that depend on the dequeued elements.
@clyde_reichert
Additionally, you can use tf.train.string_input_producer to input multiple filenames as a string tensor into a queue, then use a FileReader or TextLineReader to read the content of the files. Below is an example on how to synchronize multiple queues to read inputs from multiple files:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
import tensorflow as tf
# List of input filenames
filenames = ['file1.csv', 'file2.csv', 'file3.csv']
# Create a string input producer to enqueue the filenames
filename_queue = tf.train.string_input_producer(filenames, shuffle=True)
# Define how to read the content of each file
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
# Create multiple queues to store the read values
queue1 = tf.FIFOQueue(...)
queue2 = tf.FIFOQueue(...)
# Enqueue the values read from the files into the queues
enqueue_op1 = queue1.enqueue(value)
enqueue_op2 = queue2.enqueue(value)
# Define batch sizes for dequeueing
batch_size = 32
# Dequeue batch_size elements from each queue
dequeue_op1 = queue1.dequeue_many(batch_size)
dequeue_op2 = queue2.dequeue_many(batch_size)
# Start a session to run operations
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
# Run dequeue operations to synchronize the queues
for step in range(num_steps):
values1, values2 = sess.run([dequeue_op1, dequeue_op2])
# Perform computations using the dequeued values
coord.request_stop()
coord.join(threads)
|
In this example, filenames are enqueued into a string input producer, and each file's content is read by the TextLineReader. The content is enqueued into two different queues, and then dequeued in synchronized batches. This approach allows you to read inputs from multiple files and keep the queues synchronized during training.